Airblader commented on a change in pull request #15428:
URL: https://github.com/apache/flink/pull/15428#discussion_r603916451
##########
File path:
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
##########
@@ -214,10 +227,71 @@ private static Executor lookupExecutor(
@Override
public <T> Table fromDataStream(DataStream<T> dataStream) {
- JavaDataStreamQueryOperation<T> queryOperation =
- asQueryOperation(dataStream, Optional.empty());
+ return fromDataStreamInternal(dataStream, null, null);
+ }
- return createTable(queryOperation);
+ @Override
+ public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema) {
+ Preconditions.checkNotNull(schema, "Schema must not be null.");
+ return fromDataStreamInternal(dataStream, schema, null);
+ }
+
+ @Override
+ public <T> void createTemporaryView(String path, DataStream<T> dataStream)
{
+ createTemporaryView(path, fromDataStreamInternal(dataStream, null,
path));
+ }
+
+ @Override
+ public <T> void createTemporaryView(String path, DataStream<T> dataStream,
Schema schema) {
+ createTemporaryView(path, fromDataStreamInternal(dataStream, schema,
path));
+ }
+
+ private <T> Table fromDataStreamInternal(
+ DataStream<T> dataStream, @Nullable Schema schema, @Nullable
String viewPath) {
+ Preconditions.checkNotNull(dataStream, "Data stream must not be
null.");
+ final CatalogManager catalogManager = getCatalogManager();
+ final SchemaResolver schemaResolver =
catalogManager.getSchemaResolver();
+ final OperationTreeBuilder operationTreeBuilder =
getOperationTreeBuilder();
+
+ final UnresolvedIdentifier unresolvedIdentifier;
+ if (viewPath != null) {
+ unresolvedIdentifier = getParser().parseIdentifier(viewPath);
+ } else {
+ unresolvedIdentifier =
+ UnresolvedIdentifier.of("Unregistered_DataStream_" +
dataStream.getId());
+ }
+ final ObjectIdentifier objectIdentifier =
+ catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ final ExternalSchemaTranslator.InputResult schemaTranslationResult =
+ ExternalSchemaTranslator.fromExternal(
+ catalogManager.getDataTypeFactory(),
dataStream.getType(), schema);
+
+ final ResolvedSchema resolvedSchema =
+ schemaTranslationResult.getSchema().resolve(schemaResolver);
+
+ final QueryOperation scanOperation =
+ new JavaExternalQueryOperation<>(
+ objectIdentifier,
+ dataStream,
+ schemaTranslationResult.getPhysicalDataType(),
+ schemaTranslationResult.isTopLevelRecord(),
+ ChangelogMode.insertOnly(),
+ resolvedSchema);
+
+ final List<String> projections =
schemaTranslationResult.getProjections();
+ if (projections == null) {
Review comment:
`getProjections` is declared nullable, but at least `fromExternal`
doesn't seem to make use of that. Does it make sense to include emptiness in
this condition to avoid wrapping it unnecessarily?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java
##########
@@ -155,6 +245,117 @@ public static boolean isSourceChangeEventsDuplicate(
//
--------------------------------------------------------------------------------------------
+ /** Creates a specialized node for assigning watermarks. */
+ private static void pushWatermarkAssigner(FlinkRelBuilder relBuilder,
ResolvedSchema schema) {
+ final ExpressionConverter converter = new
ExpressionConverter(relBuilder);
+ final RelDataType inputRelDataType = relBuilder.peek().getRowType();
+
+ final WatermarkSpec watermarkSpec = schema.getWatermarkSpecs().get(0);
Review comment:
Should we validate here somehow that only one watermark spec exists
(since only one is supported)?
##########
File path:
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
##########
@@ -202,15 +209,152 @@ static StreamTableEnvironment create(
/**
* Converts the given {@link DataStream} into a {@link Table}.
*
- * <p>The field names of the {@link Table} are automatically derived from
the type of the {@link
- * DataStream}.
+ * <p>Column names and types of the {@link Table} are automatically
derived from the {@link
+ * TypeInformation} of the {@link DataStream}. If the outermost record's
{@link TypeInformation}
+ * is a {@link CompositeType}, it will be flattened in the first level.
{@link TypeInformation}
+ * that cannot be represented as one of the listed {@link DataTypes} will
be treated as a
+ * black-box {@link DataTypes#RAW(Class, TypeSerializer)} type. Thus,
composite nested fields
+ * will not be accessible.
+ *
+ * <p>Since the DataStream API does not support changelog processing
natively, this method
+ * assumes append-only/insert-only semantics during the stream-to-table
conversion. Records of
+ * type {@link Row} must describe {@link RowKind#INSERT} changes.
+ *
+ * <p>By default, the stream record's timestamp and watermarks are not
propagated unless
Review comment:
I assume you're just documenting the existing behavior here, but the
saner default would be to propagate them, right? I would assume this detail
makes this signature significantly less useful.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/ExternalDynamicSource.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.source.InputConversionOperator;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Table source for connecting to the external {@link DataStream} API. */
+@Internal
+final class ExternalDynamicSource<E>
+ implements ScanTableSource, SupportsReadingMetadata,
SupportsSourceWatermark {
+
+ private static final String ROWTIME_METADATA_KEY = "rowtime";
+
+ private static final DataType ROWTIME_METADATA_DATA_TYPE =
DataTypes.TIMESTAMP_LTZ(3).notNull();
+
+ private final ObjectIdentifier identifier;
+
+ private final DataStream<E> dataStream;
+
+ private final DataType physicalDataType;
+
+ private final boolean isTopLevelRecord;
+
+ private final ChangelogMode changelogMode;
+
+ // mutable attributes
+
+ private boolean attachRowtime;
+
+ private boolean propagateWatermark;
+
+ ExternalDynamicSource(
+ ObjectIdentifier identifier,
+ DataStream<E> dataStream,
+ DataType physicalDataType,
+ boolean isTopLevelRecord,
+ ChangelogMode changelogMode) {
+ this.identifier = identifier;
+ this.dataStream = dataStream;
+ this.physicalDataType = physicalDataType;
+ this.isTopLevelRecord = isTopLevelRecord;
+ this.changelogMode = changelogMode;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final ExternalDynamicSource<E> copy =
+ new ExternalDynamicSource<>(
+ identifier, dataStream, physicalDataType,
isTopLevelRecord, changelogMode);
+ copy.attachRowtime = attachRowtime;
+ copy.propagateWatermark = propagateWatermark;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return null;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return changelogMode;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ final DataStructureConverter physicalConverter =
+
runtimeProviderContext.createDataStructureConverter(physicalDataType);
+
+ final Transformation<E> externalTransformation =
dataStream.getTransformation();
+
+ final Transformation<RowData> conversionTransformation =
+ new OneInputTransformation<>(
+ externalTransformation,
+ generateOperatorName(),
+ new InputConversionOperator<>(
+ physicalConverter,
+ !isTopLevelRecord,
+ attachRowtime,
+ propagateWatermark,
+ changelogMode.containsOnly(RowKind.INSERT)),
+ null, // will be filled by the framework
+ externalTransformation.getParallelism());
+
+ return TransformationScanProvider.of(conversionTransformation, false);
+ }
+
+ private String generateOperatorName() {
+ return String.format(
+ "DataSteamToTable(stream=%s, type=%s, rowtime=%s,
watermark=%s)",
+ identifier.asSummaryString(),
+ physicalDataType.toString(),
+ attachRowtime,
+ propagateWatermark);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Collections.singletonMap(ROWTIME_METADATA_KEY,
ROWTIME_METADATA_DATA_TYPE);
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ if (metadataKeys.contains(ROWTIME_METADATA_KEY)) {
Review comment:
Maybe this is more for my understanding, but can this be called multiple
times and if so, do we intentionally do it such that `attachRowtime` will never
be unset again?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/ExternalDynamicSource.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.source.InputConversionOperator;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Table source for connecting to the external {@link DataStream} API. */
+@Internal
+final class ExternalDynamicSource<E>
+ implements ScanTableSource, SupportsReadingMetadata,
SupportsSourceWatermark {
+
+ private static final String ROWTIME_METADATA_KEY = "rowtime";
+
+ private static final DataType ROWTIME_METADATA_DATA_TYPE =
DataTypes.TIMESTAMP_LTZ(3).notNull();
+
+ private final ObjectIdentifier identifier;
+
+ private final DataStream<E> dataStream;
+
+ private final DataType physicalDataType;
+
+ private final boolean isTopLevelRecord;
+
+ private final ChangelogMode changelogMode;
+
+ // mutable attributes
+
+ private boolean attachRowtime;
+
+ private boolean propagateWatermark;
+
+ ExternalDynamicSource(
+ ObjectIdentifier identifier,
+ DataStream<E> dataStream,
+ DataType physicalDataType,
+ boolean isTopLevelRecord,
+ ChangelogMode changelogMode) {
+ this.identifier = identifier;
+ this.dataStream = dataStream;
+ this.physicalDataType = physicalDataType;
+ this.isTopLevelRecord = isTopLevelRecord;
+ this.changelogMode = changelogMode;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final ExternalDynamicSource<E> copy =
+ new ExternalDynamicSource<>(
+ identifier, dataStream, physicalDataType,
isTopLevelRecord, changelogMode);
+ copy.attachRowtime = attachRowtime;
+ copy.propagateWatermark = propagateWatermark;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return null;
Review comment:
Missing summary :-)
##########
File path:
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
##########
@@ -214,10 +227,71 @@ private static Executor lookupExecutor(
@Override
public <T> Table fromDataStream(DataStream<T> dataStream) {
- JavaDataStreamQueryOperation<T> queryOperation =
- asQueryOperation(dataStream, Optional.empty());
+ return fromDataStreamInternal(dataStream, null, null);
+ }
- return createTable(queryOperation);
+ @Override
+ public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema) {
+ Preconditions.checkNotNull(schema, "Schema must not be null.");
+ return fromDataStreamInternal(dataStream, schema, null);
+ }
+
+ @Override
+ public <T> void createTemporaryView(String path, DataStream<T> dataStream)
{
+ createTemporaryView(path, fromDataStreamInternal(dataStream, null,
path));
+ }
+
+ @Override
+ public <T> void createTemporaryView(String path, DataStream<T> dataStream,
Schema schema) {
+ createTemporaryView(path, fromDataStreamInternal(dataStream, schema,
path));
+ }
+
+ private <T> Table fromDataStreamInternal(
+ DataStream<T> dataStream, @Nullable Schema schema, @Nullable
String viewPath) {
+ Preconditions.checkNotNull(dataStream, "Data stream must not be
null.");
+ final CatalogManager catalogManager = getCatalogManager();
+ final SchemaResolver schemaResolver =
catalogManager.getSchemaResolver();
+ final OperationTreeBuilder operationTreeBuilder =
getOperationTreeBuilder();
+
+ final UnresolvedIdentifier unresolvedIdentifier;
+ if (viewPath != null) {
+ unresolvedIdentifier = getParser().parseIdentifier(viewPath);
+ } else {
+ unresolvedIdentifier =
+ UnresolvedIdentifier.of("Unregistered_DataStream_" +
dataStream.getId());
+ }
+ final ObjectIdentifier objectIdentifier =
+ catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ final ExternalSchemaTranslator.InputResult schemaTranslationResult =
+ ExternalSchemaTranslator.fromExternal(
+ catalogManager.getDataTypeFactory(),
dataStream.getType(), schema);
+
+ final ResolvedSchema resolvedSchema =
+ schemaTranslationResult.getSchema().resolve(schemaResolver);
+
+ final QueryOperation scanOperation =
+ new JavaExternalQueryOperation<>(
+ objectIdentifier,
+ dataStream,
+ schemaTranslationResult.getPhysicalDataType(),
+ schemaTranslationResult.isTopLevelRecord(),
+ ChangelogMode.insertOnly(),
+ resolvedSchema);
+
+ final List<String> projections =
schemaTranslationResult.getProjections();
+ if (projections == null) {
Review comment:
Ah, never mind, you also updated `fromExternal` to actually return
`null` now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]