gaoyunhaii commented on code in PR #20472:
URL: https://github.com/apache/flink/pull/20472#discussion_r939545370
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
// the boundedness has been checked via the runtime provider already,
so we can safely
// declare all legacy transformations as bounded to make the stream
graph generator happy
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
- return transformation;
+
+ // no dynamic filtering applied
+ if (getInputEdges().isEmpty() || !(transformation instanceof
SourceTransformation)) {
+ return transformation;
+ }
+
+ // handle dynamic filtering
+ Preconditions.checkArgument(getInputEdges().size() == 1);
+ BatchExecNode<?> input = (BatchExecNode<?>)
getInputEdges().get(0).getSource();
+ if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+ throw new TableException(
+ "The source input must be
BatchExecDynamicFilteringDataCollector now");
Review Comment:
nit: now -> for now
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable,
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit,
NoOpEnumState> {
+ private final TypeSerializer<RowData> serializer;
+
+ private final List<byte[]> serializedElements;
+
+ public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData>
serializer) {
+ Preconditions.checkState(serializer != null, "serializer not set");
+ this.serializedElements = serializeElements(elements, serializer);
+ this.serializer = serializer;
+ }
+
+ private List<byte[]> serializeElements(
+ Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+ List<byte[]> serializeElements = new ArrayList<>();
+
+ for (RowData element : elements) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
Review Comment:
Wrapped with try-with-resource
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.connector.source.enumerator.DynamicFilteringValuesSourceEnumerator;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import
org.apache.flink.connector.source.split.ValuesSourcePartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Source} implementation that reads data from a partitioned list.
+ *
+ * <p>This source is useful for dynamic filtering testing.
+ */
+public class DynamicFilteringValuesSource
+ implements Source<RowData, ValuesSourcePartitionSplit, NoOpEnumState> {
+
+ private final TypeSerializer<RowData> serializer;
+ private Map<Map<String, String>, byte[]> serializedElements;
+ private Map<Map<String, String>, Integer> counts;
+ private final List<String> dynamicFilteringFields;
+
+ public DynamicFilteringValuesSource(
+ Map<Map<String, String>, Collection<RowData>> elements,
+ TypeSerializer<RowData> serializer,
+ List<String> dynamicFilteringFields) {
+ this.serializer = serializer;
+ this.dynamicFilteringFields = dynamicFilteringFields;
+ serializeElements(serializer, elements);
+ }
+
+ private void serializeElements(
+ TypeSerializer<RowData> serializer,
+ Map<Map<String, String>, Collection<RowData>> elements) {
+ Preconditions.checkState(serializer != null, "serializer not set");
+
+ serializedElements = new HashMap<>();
+ counts = new HashMap<>();
+ for (Map<String, String> partition : elements.keySet()) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
Review Comment:
Also wrapped with try-with-resource
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import
org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Batch {@link ExecNode} that collects inputs and builds {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringData}, and then
sends the {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringEvent} to the
source coordinator.
+ */
+public class BatchExecDynamicFilteringDataCollector extends
ExecNodeBase<Object>
+ implements BatchExecNode<Object> {
+
+ @Experimental
+ public static final ConfigOption<MemorySize>
TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD =
+ key("table.exec.dynamic-filtering.threshold")
+ .memoryType()
+ .defaultValue(MemorySize.parse("32 mb"))
Review Comment:
Might decrease the size to 8M since AkkaOptions#FRAMESIZE by default is 10M
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
// the boundedness has been checked via the runtime provider already,
so we can safely
// declare all legacy transformations as bounded to make the stream
graph generator happy
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
- return transformation;
+
+ // no dynamic filtering applied
+ if (getInputEdges().isEmpty() || !(transformation instanceof
SourceTransformation)) {
+ return transformation;
+ }
+
+ // handle dynamic filtering
+ Preconditions.checkArgument(getInputEdges().size() == 1);
+ BatchExecNode<?> input = (BatchExecNode<?>)
getInputEdges().get(0).getSource();
+ if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+ throw new TableException(
+ "The source input must be
BatchExecDynamicFilteringDataCollector now");
+ }
+ BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector =
+ (BatchExecDynamicFilteringDataCollector) input;
+
+ ((SourceTransformation<?, ?, ?>) transformation)
+ .setCoordinatorListeningID(dynamicFilteringDataListenerID);
+
+ // Must use translateToPlan to avoid duplication dynamic filters.
+ Transformation<Object> dynamicFilteringTransform =
+ dynamicFilteringDataCollector.translateToPlan(planner);
+ ((DynamicFilteringDataCollectorOperatorFactory)
+ ((OneInputTransformation<?, ?>)
dynamicFilteringTransform)
+ .getOperatorFactory())
+
.registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID);
+
+ if (!needDynamicFilteringDependency) {
+ planner.addExtraTransformation(dynamicFilteringTransform);
+ return transformation;
+ } else {
+ MultipleInputTransformation<RowData> multipleInputTransformation =
+ new MultipleInputTransformation<>(
+ "Placeholder-Filter",
Review Comment:
Might rename to `Order-Enforcer`
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
// the boundedness has been checked via the runtime provider already,
so we can safely
// declare all legacy transformations as bounded to make the stream
graph generator happy
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
- return transformation;
+
+ // no dynamic filtering applied
+ if (getInputEdges().isEmpty() || !(transformation instanceof
SourceTransformation)) {
+ return transformation;
+ }
+
+ // handle dynamic filtering
+ Preconditions.checkArgument(getInputEdges().size() == 1);
Review Comment:
Seems better to use `checkState` since the give parameter is not an
argument.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable,
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit,
NoOpEnumState> {
+ private final TypeSerializer<RowData> serializer;
+
+ private final List<byte[]> serializedElements;
+
+ public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData>
serializer) {
+ Preconditions.checkState(serializer != null, "serializer not set");
+ this.serializedElements = serializeElements(elements, serializer);
+ this.serializer = serializer;
+ }
+
+ private List<byte[]> serializeElements(
+ Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+ List<byte[]> serializeElements = new ArrayList<>();
+
+ for (RowData element : elements) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper wrapper = new
DataOutputViewStreamWrapper(baos);
+ try {
+ serializer.serialize(element, wrapper);
+ } catch (Exception e) {
+ throw new TableException(
+ "Serializing the source elements failed: " +
e.getMessage(), e);
+ }
+ serializeElements.add(baos.toByteArray());
+ }
+ return serializeElements;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<RowData, ValuesSourceSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return new ValuesSourceReader(serializedElements, serializer,
readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<ValuesSourceSplit, NoOpEnumState> createEnumerator(
+ SplitEnumeratorContext<ValuesSourceSplit> enumContext) throws
Exception {
Review Comment:
nit: throws Exception is not used.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class ValuesSourceReader implements SourceReader<RowData,
ValuesSourceSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ValuesSourceReader.class);
+
+ /** The context for this reader, to communicate with the enumerator. */
+ private final SourceReaderContext context;
+
+ /** The availability future. This reader is available as soon as a split
is assigned. */
+ private CompletableFuture<Void> availability;
+
+ private final List<byte[]> serializedElements;
+ private final TypeSerializer<RowData> serializer;
+ private List<RowData> elements;
+
+ /** The remaining splits that were assigned but not yet processed. */
+ private final Queue<ValuesSourceSplit> remainingSplits;
+
+ private boolean noMoreSplits;
+
+ public ValuesSourceReader(
+ List<byte[]> serializedElements,
+ TypeSerializer<RowData> serializer,
+ SourceReaderContext context) {
+ this.serializedElements = serializedElements;
+ this.serializer = serializer;
+ this.context = context;
+ this.availability = new CompletableFuture<>();
+ this.remainingSplits = new ArrayDeque<>();
+ }
+
+ @Override
+ public void start() {
+ elements = new ArrayList<>();
+ for (byte[] bytes : serializedElements) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Review Comment:
Also wrapped with try-with-resource
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class DynamicFilteringValuesSourceReader
Review Comment:
Similar to `ValuesSourceReader`, we'd better not implement it from scratch
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable,
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit,
NoOpEnumState> {
+ private final TypeSerializer<RowData> serializer;
+
+ private final List<byte[]> serializedElements;
+
+ public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData>
serializer) {
+ Preconditions.checkState(serializer != null, "serializer not set");
+ this.serializedElements = serializeElements(elements, serializer);
+ this.serializer = serializer;
+ }
+
+ private List<byte[]> serializeElements(
+ Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+ List<byte[]> serializeElements = new ArrayList<>();
+
+ for (RowData element : elements) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper wrapper = new
DataOutputViewStreamWrapper(baos);
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class ValuesSourceReader implements SourceReader<RowData,
ValuesSourceSplit> {
Review Comment:
It looks to me that it might not be a good practice to directly implements a
SourceReader from scratch since it in fact has a lot of "default protocol"
with how runtime interact with the new sources. If there are some changes in
the future in the runtime side, the implementation here might also need some
rework
It looks to me to be better to extends the `SourceReaderBase` or
`SingleThreadMultiplexSourceReaderBase`.
If we reach an agreement on this point it is also acceptable to change the
implementation in a separate PR.
(a poc implementation might looks like follows:
```java
public class MyValuesSourceReader
extends SingleThreadMultiplexSourceReaderBase<RowData, RowData,
ValuesSourceSplit, byte[]> {
public MyValuesSourceReader(
List<byte[]> serializedElements,
TypeSerializer<RowData> serializer,
SourceReaderContext readerContext,
Configuration config) {
super(() -> new MyValueSourceSplitReader(serializedElements,
serializer), new MyRecordEmitter(), config, readerContext);
}
@Override
public void start() {
// we request a split only if we did not get splits during the
checkpoint restore
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}
@Override
protected void onSplitFinished(Map<String, byte[]> finishedSplitIds) {
context.sendSplitRequest();
}
@Override
protected byte[] initializedState(ValuesSourceSplit split) {
return new byte[0];
}
@Override
protected ValuesSourceSplit toSplitType(String splitId, byte[]
splitState) {
return new ValuesSourceSplit(Integer.parseInt(splitId));
}
public static class MyValueSourceSplitReader
implements SplitReader<RowData, ValuesSourceSplit> {
private final List<RowData> elements = new ArrayList<>();
private final Queue<ValuesSourceSplit> splits = new ArrayDeque<>();
private boolean nextIsFinish;
public MyValueSourceSplitReader(List<byte[]> serializedElements,
TypeSerializer<RowData> serializer) {
for (byte[] bytes : serializedElements) {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataInputView input = new
DataInputViewStreamWrapper(bais);
try {
RowData next = serializer.deserialize(input);
elements.add(next);
} catch (Exception e) {
throw new TableException(
"Failed to deserialize an element from the
source. "
+ "If you are using user-defined
serialization (Value and Writable types), check the "
+ "serialization functions.\nSerializer
is "
+ serializer,
e);
}
}
}
@Override
public RecordsWithSplitIds<RowData> fetch() throws IOException {
System.out.println("Fetch splits is " + splits);
if (nextIsFinish) {
ValuesSourceSplit split = splits.poll();
nextIsFinish = false;
return new RecordsBySplits<>(
Collections.emptyMap(),
Collections.singleton(split.splitId()));
} else {
ValuesSourceSplit split = splits.peek();
nextIsFinish = true;
return new RecordsBySplits<>(
Collections.singletonMap(split.splitId(),
Collections.singletonList(elements.get(split.getIndex()))),
Collections.emptySet());
}
}
@Override
public void handleSplitsChanges(SplitsChange<ValuesSourceSplit>
splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not
supported.",
splitsChanges.getClass()));
}
splits.addAll(splitsChanges.splits());
}
@Override
public void wakeUp() {}
@Override
public void close() throws Exception {}
}
public static class MyRecordEmitter implements RecordEmitter<RowData,
RowData, byte[]> {
@Override
public void emitRecord(RowData element, SourceOutput<RowData>
output, byte[] splitState) {
output.collect(element);
}
}
}
```
)
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -960,11 +997,12 @@ protected Collection<RowData>
convertToRowData(DataStructureConverter converter)
final RowData rowData = (RowData)
converter.toInternal(row);
if (rowData != null) {
rowData.setRowKind(row.getKind());
- result.add(rowData);
+ partitionResult.add(rowData);
+ size++;
Review Comment:
It seems size == partitionResult.size() ? Could we directly use
partitionResult.size() ?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.enumerator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A SplitEnumerator implementation for dynamic filtering source. */
+public class DynamicFilteringValuesSourceEnumerator
+ implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class);
+
+ private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context;
+ private final List<ValuesSourcePartitionSplit> allSplits;
+ private final List<String> dynamicFilteringFields;
+ private transient boolean receivedDynamicFilteringEvent;
+ private transient List<ValuesSourcePartitionSplit> remainingSplits;
+
+ public DynamicFilteringValuesSourceEnumerator(
+ SplitEnumeratorContext<ValuesSourcePartitionSplit> context,
+ List<ValuesSourcePartitionSplit> allSplits,
+ List<String> dynamicFilteringFields) {
+ this.context = context;
+ this.allSplits = allSplits;
+ this.dynamicFilteringFields = dynamicFilteringFields;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ if (!receivedDynamicFilteringEvent) {
+ throw new IllegalStateException("DynamicFilteringEvent has not
receive");
+ }
+ if (remainingSplits.isEmpty()) {
+ context.signalNoMoreSplits(subtaskId);
+ LOG.info("No more splits available for subtask {}", subtaskId);
+ } else {
+ ValuesSourcePartitionSplit split = remainingSplits.remove(0);
+ LOG.debug("Assigned split to subtask {} : {}", subtaskId, split);
+ context.assignSplit(split, subtaskId);
+ }
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof DynamicFilteringEvent) {
+ LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+ receivedDynamicFilteringEvent = true;
+ DynamicFilteringData dynamicFilteringData =
+ ((DynamicFilteringEvent) sourceEvent).getData();
+ assignPartitions(dynamicFilteringData);
+ } else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+ }
+
+ private void assignPartitions(DynamicFilteringData data) {
+ if (data.isFiltering()) {
+ remainingSplits = new ArrayList<>();
+ for (ValuesSourcePartitionSplit split : allSplits) {
+ List<String> values =
+ dynamicFilteringFields.stream()
+ .map(k -> split.getPartition().get(k))
+ .collect(Collectors.toList());
+ LOG.info("values: " + values);
+ if (data.contains(generateRowData(values, data.getRowType())))
{
+ remainingSplits.add(split);
+ }
+ }
+ } else {
+ remainingSplits = new ArrayList<>(allSplits);
+ }
+ LOG.info("remainingSplits: " + remainingSplits);
+ }
+
+ private GenericRowData generateRowData(List<String> partitionValues,
RowType rowType) {
+ Preconditions.checkArgument(partitionValues.size() ==
rowType.getFieldCount());
+ Object[] values = new Object[partitionValues.size()];
+ for (int i = 0; i < rowType.getFieldCount(); ++i) {
+ switch (rowType.getTypeAt(i).getTypeRoot()) {
+ case VARCHAR:
+ values[i] = partitionValues.get(i);
+ break;
+ case INTEGER:
+ values[i] = Integer.valueOf(partitionValues.get(i));
+ break;
+ case BIGINT:
+ values[i] = Long.valueOf(partitionValues.get(i));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ rowType.getTypeAt(i).getTypeRoot() + " is not
supported.");
+ }
+ }
+ return GenericRowData.of(values);
+ }
+
+ @Override
+ public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int
subtaskId) {
+ remainingSplits.addAll(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+ return null;
Review Comment:
This is a typo ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]