lindong28 commented on code in PR #22262:
URL: https://github.com/apache/flink/pull/22262#discussion_r1148889544


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +377,30 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
                 Row.of("c", 1000));
     }
 
+    @Test
+    public void testFromAndToDataStreamBypassConversion() throws Exception {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+        DataStream<Row> rowStream = env.fromElements(Row.of(1L, "a"));
+        Table table = tableEnv.fromDataStream(rowStream);
+        DataStream<Row> convertedDataStream = tableEnv.toDataStream(table);
+
+        assertThat(rowStream).isEqualTo(convertedDataStream);
+
+        testResult(convertedDataStream, Row.of(1L, "a"));
+    }
+
+    @Test
+    public void testFromAndToDataStreamWithoutRow() throws Exception {

Review Comment:
   Would it be simpler to remove this test?



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java:
##########
@@ -223,6 +228,28 @@ public DataStream<Row> toDataStream(Table table) {
         Preconditions.checkNotNull(table, "Table must not be null.");
         // include all columns of the query (incl. metadata and computed 
columns)
         final DataType sourceType = 
table.getResolvedSchema().toSourceRowDataType();
+
+        if (!(table.getQueryOperation() instanceof ExternalQueryOperation)) {
+            return toDataStream(table, sourceType);
+        }
+
+        DataTypeFactory dataTypeFactory = 
getCatalogManager().getDataTypeFactory();
+        SchemaResolver schemaResolver = 
getCatalogManager().getSchemaResolver();
+        ExternalQueryOperation<?> queryOperation =
+                (ExternalQueryOperation<?>) table.getQueryOperation();
+        DataStream<?> dataStream = queryOperation.getDataStream();
+
+        SchemaTranslator.ConsumingResult consumingResult =
+                SchemaTranslator.createConsumingResult(dataTypeFactory, 
dataStream.getType(), null);
+        ResolvedSchema defaultSchema = 
consumingResult.getSchema().resolve(schemaResolver);
+
+        if 
(queryOperation.getChangelogMode().equals(ChangelogMode.insertOnly())
+                && table.getResolvedSchema().equals(defaultSchema)
+                && consumingResult.getProjections() == null

Review Comment:
   Would it be simpler to remove this line?



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java:
##########
@@ -454,6 +454,12 @@ Table fromChangelogStream(
      * <p>If the input table contains a single rowtime column, it will be 
propagated into a stream
      * record's timestamp. Watermarks will be propagated as well.
      *
+     * <p>Specifically, if the input table wraps around and describes a 
relational operation that

Review Comment:
   Would it be simpler to say this:
   
   ```
    * <p>If the input table is created by {@link #fromDataStream(DataStream)}}, 
the original
    * DataStream is returned.
    *
    * <p>If the input table contains a single rowtime column, it will be 
propagated into a stream
    * record's timestamp. Watermarks will be propagated as well.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to