lsyldliu commented on code in PR #20616:
URL: https://github.com/apache/flink/pull/20616#discussion_r949754180
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/HeapRowVector.java:
##########
@@ -46,4 +46,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}
+
+ @Override
+ public void reset() {
Review Comment:
After read hive `MapColumnVector` and `ListColumnVector` code, I think we
should also override `HeapArrayVector` and `HeapMapVector`, what dou you thinks
so? Can you help do this in this PR?
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
+ sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+ expectedRows = new ArrayList<>();
+ for (int i = 0; i < sortedRows.size(); i++) {
+ Row rowExpect = Row.copy(sortedRows.get(i));
+ Row nestedRow = (Row) rowExpect.getField(2);
+ if (nestedRow.getField(0) == null
+ && nestedRow.getField(1) == null
+ && nestedRow.getField(2) == null) {
+ rowExpect.setField(2, null);
+ }
+ expectedRows.add(rowExpect);
+ }
+ return expectedRows;
+ }
+
+ private static void insertToSinkAndCompare(StreamTableEnvironment tEnv,
List<Row> expectedRows)
+ throws Exception {
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE sink_table (a int, b string,"
+ + "c struct<c1:string, c2:int, c3:int>,"
+ + "d map<string, string>, e array<string>, f string "
+ + ") "
+ + " stored as parquet"
+ + " TBLPROPERTIES ("
+ +
"'sink.partition-commit.policy.kind'='metastore,success-file',"
+ + "'auto-compaction'='true',"
+ + "'compaction.file-size' = '128MB',"
+ + "'sink.rolling-policy.file-size' = '1b'"
+ + ")");
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ String sql =
+ "insert into sink_table /*+ OPTIONS('sink.parallelism' = '3')
*/"
+ + " select * from my_table";
+ tEnv.executeSql(sql).await();
+ assertIterator(tEnv.executeSql("select * from sink_table").collect(),
expectedRows);
+ }
+
+ private static void assertIterator(CloseableIterator<Row> iterator,
List<Row> expectedRows)
+ throws Exception {
+ List<Row> result = CollectionUtil.iteratorToList(iterator);
+ iterator.close();
+ result.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+ for (int i = 0; i < result.size(); i++) {
Review Comment:
assert the list directly is ok.
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
Review Comment:
```suggestion
List<Row> sortedRows = new ArrayList<>();
```
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
+ sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+ expectedRows = new ArrayList<>();
Review Comment:
```suggestion
List<Row> expectedRows = new ArrayList<>();
```
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
Review Comment:
why the expected row size twice as large as the input?
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
Review Comment:
```suggestion
List<Row> expectedRows = generateExpectedRows(rows);
```
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
+ sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+ expectedRows = new ArrayList<>();
+ for (int i = 0; i < sortedRows.size(); i++) {
+ Row rowExpect = Row.copy(sortedRows.get(i));
+ Row nestedRow = (Row) rowExpect.getField(2);
+ if (nestedRow.getField(0) == null
+ && nestedRow.getField(1) == null
+ && nestedRow.getField(2) == null) {
+ rowExpect.setField(2, null);
+ }
+ expectedRows.add(rowExpect);
+ }
+ return expectedRows;
+ }
+
+ private static void insertToSinkAndCompare(StreamTableEnvironment tEnv,
List<Row> expectedRows)
Review Comment:
executeAndAssert?
##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java:
##########
@@ -963,6 +971,121 @@ public void testStreamReadWithProjectPushDown() throws
Exception {
result.getJobClient().get().cancel();
}
+ @Test(timeout = 120000)
+ public void testReadParquetWithNullableComplexType() throws Exception {
+ List<Row> expectedRows;
+ final String catalogName = "hive";
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+ env.enableCheckpointing(100);
+ StreamTableEnvironment tEnv =
+ HiveTestUtils.createTableEnvInStreamingMode(env,
SqlDialect.HIVE);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ tEnv.useCatalog(catalogName);
+
+ List<Row> rows = generateRows();
+ expectedRows = generateExpectedRows(rows);
+ DataStream<Row> stream =
+ env.addSource(
+ new FiniteTestSource<>(rows),
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.INT,
+ Types.STRING,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ Types.STRING,
Types.INT, Types.INT
+ },
+ new String[] {"c1", "c2",
"c3"}),
+ new MapTypeInfo<>(Types.STRING,
Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING),
+ Types.STRING
+ },
+ new String[] {"a", "b", "c", "d", "e",
"f"}))
+ .filter((FilterFunction<Row>) value -> true)
+ .setParallelism(3); // to parallel tasks
+
+ tEnv.createTemporaryView("my_table", stream);
+ insertToSinkAndCompare(tEnv, expectedRows);
+ }
+
+ private static List<Row> generateRows() {
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ Map<String, String> e = new HashMap<>();
+ e.put(i + "", i % 2 == 0 ? null : i + "");
+ String[] f = new String[2];
+ f[0] = i % 3 == 0 ? null : i + "";
+ f[1] = i % 3 == 2 ? null : i + "";
+ rows.add(
+ Row.of(
+ i,
+ String.valueOf(i % 10),
+ Row.of(
+ i % 2 == 0 ? null : String.valueOf(i % 10),
+ i % 3 == 0 ? null : i % 10,
+ i % 5 == 0 ? null : i % 10),
+ e,
+ f,
+ String.valueOf(i % 10)));
+ }
+ return rows;
+ }
+
+ private static List<Row> generateExpectedRows(List<Row> rows) {
+ List<Row> sortedRows, expectedRows;
+ sortedRows = new ArrayList<>();
+ sortedRows.addAll(rows);
+ sortedRows.addAll(rows);
+ sortedRows.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+
+ expectedRows = new ArrayList<>();
+ for (int i = 0; i < sortedRows.size(); i++) {
+ Row rowExpect = Row.copy(sortedRows.get(i));
+ Row nestedRow = (Row) rowExpect.getField(2);
+ if (nestedRow.getField(0) == null
+ && nestedRow.getField(1) == null
+ && nestedRow.getField(2) == null) {
+ rowExpect.setField(2, null);
+ }
+ expectedRows.add(rowExpect);
+ }
+ return expectedRows;
+ }
+
+ private static void insertToSinkAndCompare(StreamTableEnvironment tEnv,
List<Row> expectedRows)
+ throws Exception {
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE sink_table (a int, b string,"
+ + "c struct<c1:string, c2:int, c3:int>,"
+ + "d map<string, string>, e array<string>, f string "
+ + ") "
+ + " stored as parquet"
+ + " TBLPROPERTIES ("
+ +
"'sink.partition-commit.policy.kind'='metastore,success-file',"
+ + "'auto-compaction'='true',"
+ + "'compaction.file-size' = '128MB',"
+ + "'sink.rolling-policy.file-size' = '1b'"
+ + ")");
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ String sql =
+ "insert into sink_table /*+ OPTIONS('sink.parallelism' = '3')
*/"
+ + " select * from my_table";
+ tEnv.executeSql(sql).await();
+ assertIterator(tEnv.executeSql("select * from sink_table").collect(),
expectedRows);
+ }
+
+ private static void assertIterator(CloseableIterator<Row> iterator,
List<Row> expectedRows)
Review Comment:
assertResults?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]