yunfengzhou-hub commented on code in PR #22034:
URL: https://github.com/apache/flink/pull/22034#discussion_r1125931457


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+        // test datastream of rows with non-default name
+        DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
+
+        Table table = tableEnv.fromDataStream(dataStream);
+        DataStream<Row> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+
+        // access rows by default name
+        DataStream<Row> transformedDataStream =
+                convertedDataStream.map(
+                        (MapFunction<Row, Row>)
+                                value -> Row.of(value.getField("f0"), 
value.getField("f1")),
+                        typeInfo);
+
+        testResult(transformedDataStream, Row.of(1L, "a"));
+
+        // test datastreams of row with non-default name
+        final RowTypeInfo typeInfoWithColNames =
+                new RowTypeInfo(
+                        new TypeInformation[] {Types.LONG, Types.STRING},
+                        new String[] {"col0", "col1"});
+
+        final DataStream<Row> dataStreamWithFieldName =
+                env.addSource(rowGenerator, typeInfoWithColNames);

Review Comment:
   nit: Let's unify the naming of `dataStreamWithFieldName` and 
`typeInfoWithColNames`, use either `fieldName` or `colNames` in both case.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+        // test datastream of rows with non-default name

Review Comment:
   nit: This part should be testing default name, and the following part 
testing non-default name.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {

Review Comment:
   nit: this `final` keyword can be removed.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -310,9 +315,12 @@ public void testFromAndToDataStreamWithRaw() throws 
Exception {
                 table.execute(),
                 Row.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
                 Row.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
-        testResult(
-                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType())),
-                rawRecords.toArray(new Tuple2[0]));
+
+        final DataStream<Tuple2<DayOfWeek, ZoneOffset>> resultDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+        assertEquals(dataStream, resultDataStream);

Review Comment:
   Let's use `assertThat(...).isEqualTo(...)`, as other test cases.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+        // test datastream of rows with non-default name
+        DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
+
+        Table table = tableEnv.fromDataStream(dataStream);
+        DataStream<Row> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+
+        // access rows by default name
+        DataStream<Row> transformedDataStream =
+                convertedDataStream.map(

Review Comment:
   This transformation can also be removed.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));

Review Comment:
   It seems that Tuples are not treated as POJOs in Flink's serialization[1]. 
Let's rename this test case to correctly reflect this information and maybe add 
another test case for POJOs.
   
   [1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/



##########
flink-core/src/test/java/org/apache/flink/types/RowTest.java:
##########
@@ -133,6 +134,18 @@ public void testRowPositioned() {
         assertThat(row.getField(1), equalTo(true));
         assertThat(row.getField(2), equalTo("Hello"));
 
+        // test accessing positioned row by default name
+        assertEquals(13, row.getField("f0"));
+        assertTrue((boolean) row.getField("f1"));
+        assertEquals("Hello", row.getField("f2"));

Review Comment:
   nit: Let's use `assertThat` in these verifications.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {

Review Comment:
   Let's add test cases for datastream's watermark property as well.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});

Review Comment:
   The constructor `new TypeInformation[] {...}` can be removed.



##########
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java:
##########
@@ -308,6 +314,46 @@ protected void validateTimeCharacteristic(boolean 
isRowtimeDefined) {
         }
     }
 
+    protected <T> DataStream<T> bypassTableConversion(
+            Table table, AbstractDataType<?> targetDataType, DataTypeFactory 
dataTypeFactory) {
+        final boolean hasWatermark =
+                
CollectionUtils.isNotEmpty(table.getResolvedSchema().getWatermarkSpecs());
+        final boolean hasPrimaryKey = 
table.getResolvedSchema().getPrimaryKey().isPresent();
+
+        final QueryOperation queryOperation = table.getQueryOperation();
+
+        if (!hasWatermark && !hasPrimaryKey && queryOperation instanceof 
ExternalQueryOperation) {
+            final ExternalQueryOperation<T> externalQueryOperation =
+                    (ExternalQueryOperation<T>) queryOperation;
+            final DataStream<T> dataStream = 
externalQueryOperation.getDataStream();
+            final TypeInformation<T> typeInfo = dataStream.getType();
+
+            final DataType inputDataType =
+                    TypeInfoDataTypeConverter.toDataType(dataTypeFactory, 
typeInfo);
+            final DataType outputDataType = 
dataTypeFactory.createDataType(targetDataType);
+
+            final boolean objectReuseEnabled = 
this.executionEnvironment.getConfig().isObjectReuseEnabled();

Review Comment:
   Let's run `mvn spotless:apply` to fix coding style typos like this line.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);

Review Comment:
   It seems that this test case does not require parallelism = 1 to pass. Let's 
remove this. Same to other test cases.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final List<Tuple2<Long, String>> tuples =
+                Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+        final DataStream<Tuple2<Long, String>> dataStream =
+                env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+        final Table table = tableEnv.fromDataStream(dataStream);
+
+        final DataStream<Tuple2<Long, String>> convertedDataStream =
+                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+        assertEquals(dataStream, convertedDataStream);
+        testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+        final Table tableWithPK =
+                tableEnv.fromDataStream(
+                        dataStream,
+                        Schema.newBuilder()
+                                .column("f0", BIGINT().notNull())
+                                .column("f1", STRING())
+                                .primaryKey("f0")
+                                .build());
+        final DataStream<Tuple2<Long, String>> convertedDataStreamWithPK =
+                tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+        assertNotEquals(dataStream, convertedDataStreamWithPK);
+        testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+    }
+
+    @Test
+    public void testFromAndToDataStreamBypassWithRow() throws Exception {
+        env.setParallelism(1);
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        final SourceFunction<Row> rowGenerator =
+                new SourceFunction<Row>() {
+                    @Override
+                    public final void run(SourceContext<Row> ctx) throws 
Exception {
+                        Row row = new Row(2);
+                        row.setField(0, 1L);
+                        row.setField(1, "a");
+                        ctx.collect(row);
+                    }
+
+                    @Override
+                    public void cancel() {}
+                };
+
+        final RowTypeInfo typeInfo =
+                new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+        // test datastream of rows with non-default name
+        DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);

Review Comment:
   It seems that `env.fromCollection(...)` is enough. Let's simplify this part 
of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to