This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38ee097  [FLINK-12970][hive] Support writing Hive complex types
38ee097 is described below

commit 38ee097a408a7181631103a69c5914aac2fd3ff8
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 25 20:45:26 2019 +0800

    [FLINK-12970][hive] Support writing Hive complex types
    
    This PR supports writing to Hive tables with complex data types.
    
    This closes #8875.
---
 .../connectors/hive/HiveTableOutputFormat.java     |  23 +++--
 .../table/catalog/hive/util/HiveTableUtil.java     |  27 -----
 .../table/catalog/hive/util/HiveTypeUtil.java      |  14 +--
 .../functions/hive/conversion/HiveInspectors.java  |  36 ++++++-
 .../batch/connectors/hive/HiveTableSinkTest.java   | 114 ++++++++++++++++++---
 5 files changed, 155 insertions(+), 59 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index d0ba4b0..a0fdf56 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -32,6 +32,9 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -133,6 +136,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
 
        private transient String hiveVersion;
 
+       // to convert Flink object to Hive object
+       private transient HiveObjectConversion[] hiveConversions;
+
        public HiveTableOutputFormat(JobConf jobConf, String databaseName, 
String tableName, List<String> partitionColumns,
                                                                RowTypeInfo 
rowTypeInfo, HiveTablePartition hiveTablePartition,
                                                                Properties 
tableProperties, boolean overwrite) {
@@ -274,21 +280,24 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        dynamicPartitionOffset = rowTypeInfo.getArity() - 
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
                }
 
-               List<ObjectInspector> objectInspectors = new ArrayList<>();
-               for (int i = 0; i < rowTypeInfo.getArity() - 
partitionColumns.size(); i++) {
-                       
objectInspectors.add(HiveTableUtil.getObjectInspector(LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i))));
+               numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity() 
- partitionColumns.size() : rowTypeInfo.getArity();
+               hiveConversions = new 
HiveObjectConversion[numNonPartitionColumns];
+               List<ObjectInspector> objectInspectors = new 
ArrayList<>(hiveConversions.length);
+               for (int i = 0; i < numNonPartitionColumns; i++) {
+                       DataType dataType = 
LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i));
+                       ObjectInspector objectInspector = 
HiveInspectors.getObjectInspector(dataType);
+                       objectInspectors.add(objectInspector);
+                       hiveConversions[i] = 
HiveInspectors.getConversion(objectInspector, dataType.getLogicalType());
                }
 
                if (!isPartitioned) {
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
                                Arrays.asList(rowTypeInfo.getFieldNames()),
                                objectInspectors);
-                       numNonPartitionColumns = rowTypeInfo.getArity();
                } else {
                        rowObjectInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
                                
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - 
partitionColumns.size()),
                                objectInspectors);
-                       numNonPartitionColumns = rowTypeInfo.getArity() - 
partitionColumns.size();
                }
                hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
        }
@@ -385,11 +394,11 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                outputCommitter.commitJob(jobContext);
        }
 
-       // converts a Row to a list so that Hive can serialize it
+       // converts a Row to a list of Hive objects so that Hive can serialize 
it
        private Object getConvertedRow(Row record) {
                List<Object> res = new ArrayList<>(numNonPartitionColumns);
                for (int i = 0; i < numNonPartitionColumns; i++) {
-                       res.add(record.getField(i));
+                       
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
                }
                return res;
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d4ed021..c2597a9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.catalog.hive.util;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.DataType;
 
@@ -29,13 +28,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -139,25 +133,4 @@ public class HiveTableUtil {
                return partition;
        }
 
-       /**
-        * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
-        */
-       public static ObjectInspector getObjectInspector(DataType flinkType) 
throws IOException {
-               return 
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
-       }
-
-       // TODO: reuse Hive's TypeInfoUtils?
-       private static ObjectInspector getObjectInspector(TypeInfo type) throws 
IOException {
-               switch (type.getCategory()) {
-
-                       case PRIMITIVE:
-                               PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
-                               return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
-
-                       // TODO: support complex types
-                       default:
-                               throw new IOException("Unsupported Hive type 
category " + type.getCategory());
-               }
-       }
-
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index aa8ee4b..b9f8a57 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.KeyValueDataType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
@@ -165,14 +166,15 @@ public class HiveTypeUtil {
                }
 
                if (dataType instanceof FieldsDataType) {
-                       Map<String, DataType> fieldDataTypes = 
((FieldsDataType) dataType).getFieldDataTypes();
+                       FieldsDataType fieldsDataType = (FieldsDataType) 
dataType;
+                       // need to retrieve field names in order
+                       List<String> names = ((RowType) 
fieldsDataType.getLogicalType()).getFieldNames();
 
-                       List<String> names = new 
ArrayList(fieldDataTypes.size());
-                       List<TypeInfo> typeInfos = new 
ArrayList<>(fieldDataTypes.size());
+                       Map<String, DataType> nameToType = 
fieldsDataType.getFieldDataTypes();
+                       List<TypeInfo> typeInfos = new 
ArrayList<>(names.size());
 
-                       for (Map.Entry<String, DataType> e : 
fieldDataTypes.entrySet()) {
-                               names.add(e.getKey());
-                               typeInfos.add(toHiveTypeInfo(e.getValue()));
+                       for (String name : names) {
+                               
typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
                        }
 
                        return TypeInfoFactory.getStructTypeInfo(names, 
typeInfos);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 56f0b41..bbd0b55 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.functions.hive.conversion;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 import org.apache.flink.table.types.DataType;
@@ -44,6 +46,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -77,7 +80,10 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspec
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -394,14 +400,40 @@ public class HiveInspectors {
                return getObjectInspector(typeInfo);
        }
 
+       /**
+        * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+        */
+       public static ObjectInspector getObjectInspector(DataType flinkType) {
+               return 
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
+       }
+
        private static ObjectInspector getObjectInspector(TypeInfo type) {
                switch (type.getCategory()) {
+
                        case PRIMITIVE:
                                PrimitiveTypeInfo primitiveType = 
(PrimitiveTypeInfo) type;
                                return 
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+                       case LIST:
+                               ListTypeInfo listType = (ListTypeInfo) type;
+                               return 
ObjectInspectorFactory.getStandardListObjectInspector(
+                                               
getObjectInspector(listType.getListElementTypeInfo()));
+                       case MAP:
+                               MapTypeInfo mapType = (MapTypeInfo) type;
+                               return 
ObjectInspectorFactory.getStandardMapObjectInspector(
+                                               
getObjectInspector(mapType.getMapKeyTypeInfo()), 
getObjectInspector(mapType.getMapValueTypeInfo()));
+                       case STRUCT:
+                               StructTypeInfo structType = (StructTypeInfo) 
type;
+                               List<TypeInfo> fieldTypes = 
structType.getAllStructFieldTypeInfos();
+
+                               List<ObjectInspector> fieldInspectors = new 
ArrayList<ObjectInspector>();
+                               for (TypeInfo fieldType : fieldTypes) {
+                                       
fieldInspectors.add(getObjectInspector(fieldType));
+                               }
+
+                               return 
ObjectInspectorFactory.getStandardStructObjectInspector(
+                                               
structType.getAllStructFieldNames(), fieldInspectors);
                        default:
-                               throw new FlinkHiveUDFException(
-                                       String.format("TypeInfo %s is not 
supported yet", type));
+                               throw new CatalogException("Unsupported Hive 
type category " + type.getCategory());
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index c880d54..b8f68a4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.batch.connectors.hive;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -31,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -50,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -127,21 +128,94 @@ public class HiveTableSinkTest {
                hiveCatalog.dropTable(tablePath, false);
        }
 
-       private RowTypeInfo createDestTable(String dbName, String tblName, int 
numPartCols) throws Exception {
+       @Test
+       public void testWriteComplexType() throws Exception {
+               String dbName = "default";
+               String tblName = "dest";
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
-               TableSchema tableSchema = new TableSchema(
-                               new String[]{"i", "l", "d", "s"},
-                               new TypeInformation[]{
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.LONG_TYPE_INFO,
-                                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                                               BasicTypeInfo.STRING_TYPE_INFO}
-               );
+
+               TableSchema.Builder builder = new TableSchema.Builder();
+               builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+                               DataTypes.ARRAY(DataTypes.INT()),
+                               DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                               DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 
builder.build(), 0);
+               List<Row> toWrite = new ArrayList<>();
+               Row row = new Row(rowTypeInfo.getArity());
+               Object[] array = new Object[]{1, 2, 3};
+               Map<Integer, String> map = new HashMap<Integer, String>() {{
+                       put(1, "a");
+                       put(2, "b");
+               }};
+               Row struct = new Row(2);
+               struct.setField(0, 3);
+               struct.setField(1, "c");
+
+               row.setField(0, array);
+               row.setField(1, map);
+               row.setField(2, struct);
+               toWrite.add(row);
+
+               ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
+               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(execEnv);
+               tableEnv.registerDataSet("complexSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
+
+               Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+               tableEnv.registerTableSink("complexSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
+                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               tableEnv.sqlQuery("select * from 
complexSrc").insertInto("complexSink");
+               execEnv.execute();
+
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), Collections.singletonList("1 2 3,1 a 2 b,3 c"));
+               hiveCatalog.dropTable(tablePath, false);
+
+               // nested complex types
+               builder = new TableSchema.Builder();
+               // array of rows
+               builder.fields(new String[]{"a"}, new 
DataType[]{DataTypes.ARRAY(
+                               DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())))});
+               rowTypeInfo = createDestTable(dbName, tblName, builder.build(), 
0);
+               row = new Row(rowTypeInfo.getArity());
+               array = new Object[3];
+               row.setField(0, array);
+               for (int i = 0; i < array.length; i++) {
+                       struct = new Row(2);
+                       struct.setField(0, 1 + i);
+                       struct.setField(1, String.valueOf((char) ('a' + i)));
+                       array[i] = struct;
+               }
+               toWrite.clear();
+               toWrite.add(row);
+
+               tableEnv.registerDataSet("nestedSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
+               hiveTable = hiveCatalog.getHiveTable(tablePath);
+               tableEnv.registerTableSink("nestedSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
+                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               tableEnv.sqlQuery("select * from 
nestedSrc").insertInto("nestedSink");
+               execEnv.execute();
+
+               verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), Collections.singletonList("1 a 2 b 3 c"));
+               hiveCatalog.dropTable(tablePath, false);
+       }
+
+       private RowTypeInfo createDestTable(String dbName, String tblName, 
TableSchema tableSchema, int numPartCols) throws Exception {
                CatalogTable catalogTable = createCatalogTable(tableSchema, 
numPartCols);
-               hiveCatalog.createTable(tablePath, catalogTable, false);
+               hiveCatalog.createTable(new ObjectPath(dbName, tblName), 
catalogTable, false);
                return new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
 
+       private RowTypeInfo createDestTable(String dbName, String tblName, int 
numPartCols) throws Exception {
+               TableSchema.Builder builder = new TableSchema.Builder();
+               builder.fields(new String[]{"i", "l", "d", "s"},
+                               new DataType[]{
+                                               DataTypes.INT(),
+                                               DataTypes.BIGINT(),
+                                               DataTypes.DOUBLE(),
+                                               DataTypes.STRING()});
+               return createDestTable(dbName, tblName, builder.build(), 
numPartCols);
+       }
+
        private CatalogTable createCatalogTable(TableSchema tableSchema, int 
numPartCols) {
                if (numPartCols == 0) {
                        return new CatalogTableImpl(tableSchema, new 
HashMap<>(), "");
@@ -165,19 +239,25 @@ public class HiveTableSinkTest {
                return res;
        }
 
-       private void verifyWrittenData(Path outputFile, List<Row> expected, int 
numPartCols) throws Exception {
+       private void verifyWrittenData(Path outputFile, List<Row> expectedRows, 
int numPartCols) throws Exception {
+               int[] fields = IntStream.range(0, 
expectedRows.get(0).getArity() - numPartCols).toArray();
+               List<String> expected = new ArrayList<>(expectedRows.size());
+               for (Row row : expectedRows) {
+                       expected.add(Row.project(row, fields).toString());
+               }
+               verifyWrittenData(outputFile, expected);
+       }
+
+       private void verifyWrittenData(Path outputFile, List<String> expected) 
throws Exception {
                FileSystem fs = outputFile.getFileSystem(hiveConf);
                assertTrue(fs.exists(outputFile));
-               int[] fields = IntStream.range(0, expected.get(0).getArity() - 
numPartCols).toArray();
                try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(outputFile)))) {
                        int numWritten = 0;
                        String line = reader.readLine();
                        while (line != null) {
-                               Row expectedRow = 
Row.project(expected.get(numWritten++), fields);
-                               assertEquals(expectedRow.toString(), 
line.replaceAll("\u0001", ","));
+                               assertEquals(expected.get(numWritten++), 
line.replaceAll("\u0001", ",").replaceAll("\u0002", " ").replaceAll("\u0003", " 
"));
                                line = reader.readLine();
                        }
-                       reader.close();
                        assertEquals(expected.size(), numWritten);
                }
        }

Reply via email to