lindong28 commented on code in PR #22262:
URL: https://github.com/apache/flink/pull/22262#discussion_r1148889544
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -377,6 +377,30 @@ public void testFromAndToDataStreamEventTime() throws
Exception {
Row.of("c", 1000));
}
+ @Test
+ public void testFromAndToDataStreamBypassConversion() throws Exception {
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+
+ DataStream<Row> rowStream = env.fromElements(Row.of(1L, "a"));
+ Table table = tableEnv.fromDataStream(rowStream);
+ DataStream<Row> convertedDataStream = tableEnv.toDataStream(table);
+
+ assertThat(rowStream).isEqualTo(convertedDataStream);
+
+ testResult(convertedDataStream, Row.of(1L, "a"));
+ }
+
+ @Test
+ public void testFromAndToDataStreamWithoutRow() throws Exception {
Review Comment:
Would it be simpler to remove this test?
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java:
##########
@@ -223,6 +228,28 @@ public DataStream<Row> toDataStream(Table table) {
Preconditions.checkNotNull(table, "Table must not be null.");
// include all columns of the query (incl. metadata and computed
columns)
final DataType sourceType =
table.getResolvedSchema().toSourceRowDataType();
+
+ if (!(table.getQueryOperation() instanceof ExternalQueryOperation)) {
+ return toDataStream(table, sourceType);
+ }
+
+ DataTypeFactory dataTypeFactory =
getCatalogManager().getDataTypeFactory();
+ SchemaResolver schemaResolver =
getCatalogManager().getSchemaResolver();
+ ExternalQueryOperation<?> queryOperation =
+ (ExternalQueryOperation<?>) table.getQueryOperation();
+ DataStream<?> dataStream = queryOperation.getDataStream();
+
+ SchemaTranslator.ConsumingResult consumingResult =
+ SchemaTranslator.createConsumingResult(dataTypeFactory,
dataStream.getType(), null);
+ ResolvedSchema defaultSchema =
consumingResult.getSchema().resolve(schemaResolver);
+
+ if
(queryOperation.getChangelogMode().equals(ChangelogMode.insertOnly())
+ && table.getResolvedSchema().equals(defaultSchema)
+ && consumingResult.getProjections() == null
Review Comment:
Would it be simpler to remove this line?
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java:
##########
@@ -454,6 +454,12 @@ Table fromChangelogStream(
* <p>If the input table contains a single rowtime column, it will be
propagated into a stream
* record's timestamp. Watermarks will be propagated as well.
*
+ * <p>Specifically, if the input table wraps around and describes a
relational operation that
Review Comment:
Would it be simpler to say this:
```
* <p>If the input table is created by {@link #fromDataStream(DataStream)}},
the original
* DataStream is returned.
*
* <p>If the input table contains a single rowtime column, it will be
propagated into a stream
* record's timestamp. Watermarks will be propagated as well.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]