This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0dc8890f1b8 [FLINK-28711][hive] Hive source supports dynamic filtering
0dc8890f1b8 is described below
commit 0dc8890f1b8164bc37bd8b209e6d4eef37386087
Author: Gen Luo <[email protected]>
AuthorDate: Tue Aug 2 20:16:58 2022 +0800
[FLINK-28711][hive] Hive source supports dynamic filtering
This closes #20415
---
.../file/src/enumerate/DynamicFileEnumerator.java | 48 +++++
.../file/src/impl/DynamicFileSplitEnumerator.java | 207 +++++++++++++++++++
.../src/impl/DynamicFileSplitEnumeratorTest.java | 190 ++++++++++++++++++
.../hive/DynamicHiveSplitEnumerator.java | 35 ++++
.../apache/flink/connectors/hive/HiveSource.java | 26 +++
.../flink/connectors/hive/HiveSourceBuilder.java | 10 +
.../hive/HiveSourceDynamicFileEnumerator.java | 221 +++++++++++++++++++++
.../flink/connectors/hive/HiveTableSource.java | 57 +++++-
.../flink/connectors/hive/HiveDialectITCase.java | 135 +++++++++++++
.../hive/HiveSourceDynamicFileEnumeratorTest.java | 188 ++++++++++++++++++
.../connector/source/DynamicFilteringData.java | 2 +
.../planner/connectors/DynamicSourceUtils.java | 20 --
.../batch/DynamicPartitionPruningRule.java | 35 +++-
13 files changed, 1151 insertions(+), 23 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
new file mode 100644
index 00000000000..416c9b58deb
--- /dev/null
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+
+/**
+ * {@code FileEnumerator} that supports dynamic filtering. The enumerator only
enumerates splits
+ * that exist in the given {@link DynamicFilteringData}, while enumerates all
splits if no
+ * DynamicFilteringData is provided when #enumerateSplits is called.
+ */
+@PublicEvolving
+public interface DynamicFileEnumerator extends FileEnumerator {
+
+ /**
+ * Provides a {@link DynamicFilteringData} for filtering while the
enumerator is enumerating
+ * splits.
+ *
+ * <p>The {@link DynamicFilteringData} is typically collected by a
collector operator, and
+ * transferred here by a coordinating event. The method should never be
called directly by
+ * users.
+ */
+ void setDynamicFilteringData(DynamicFilteringData data);
+
+ /** Factory for the {@link DynamicFileEnumerator}. */
+ @FunctionalInterface
+ interface Provider extends FileEnumerator.Provider {
+
+ DynamicFileEnumerator create();
+ }
+}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
new file mode 100644
index 00000000000..bec6b6d904e
--- /dev/null
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation that supports dynamic filtering.
+ *
+ * <p>This enumerator handles {@link DynamicFilteringEvent} and filter out the
desired input splits
+ * with the support of the {@link DynamicFileEnumerator}.
+ *
+ * <p>If the enumerator receives the first split request before any dynamic
filtering data is
+ * received, it will enumerate all splits. If a DynamicFilterEvent is received
during the fully
+ * enumerating, the remaining splits will be filtered accordingly.
+ */
+@Internal
+public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
+ implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
+ SupportsHandleExecutionAttemptSourceEvent {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicFileSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<SplitT> context;
+
+ private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
+
+ private final FileSplitAssigner.Provider splitAssignerFactory;
+
+ /**
+ * Stores the id of splits that has been assigned. The split assigner may
be rebuilt when a
+ * DynamicFilteringEvent is received. After that, the splits that are
already assigned can be
+ * assigned for the second time. We have to retain the state and filter
out the splits that has
+ * been assigned with this set.
+ */
+ private final Set<String> assignedSplits;
+
+ private transient Set<String> allEnumeratingSplits;
+
+ private transient FileSplitAssigner splitAssigner;
+
+ // ------------------------------------------------------------------------
+
+ public DynamicFileSplitEnumerator(
+ SplitEnumeratorContext<SplitT> context,
+ DynamicFileEnumerator.Provider fileEnumeratorFactory,
+ FileSplitAssigner.Provider splitAssignerFactory) {
+ this.context = checkNotNull(context);
+ this.splitAssignerFactory = checkNotNull(splitAssignerFactory);
+ this.fileEnumeratorFactory = checkNotNull(fileEnumeratorFactory);
+ this.assignedSplits = new HashSet<>();
+ }
+
+ @Override
+ public void start() {
+ // no resources to start
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources to close
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon
registration
+ }
+
+ @Override
+ public void handleSplitRequest(int subtask, @Nullable String hostname) {
+ if (!context.registeredReaders().containsKey(subtask)) {
+ // reader failed between sending the request and now. skip this
request.
+ return;
+ }
+
+ if (splitAssigner == null) {
+ // No DynamicFilteringData is received before the first split
request,
+ // create a split assigner that handles all splits
+ createSplitAssigner(null);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ final String hostInfo =
+ hostname == null ? "(no host locality info)" : "(on host
'" + hostname + "')";
+ LOG.debug("Subtask {} {} is requesting a file source split",
subtask, hostInfo);
+ }
+
+ final Optional<FileSourceSplit> nextSplit =
getNextUnassignedSplit(hostname);
+ if (nextSplit.isPresent()) {
+ final FileSourceSplit split = nextSplit.get();
+ context.assignSplit((SplitT) split, subtask);
+ assignedSplits.add(split.splitId());
+ LOG.debug("Assigned split to subtask {} : {}", subtask, split);
+ } else {
+ context.signalNoMoreSplits(subtask);
+ LOG.info("No more splits available for subtask {}", subtask);
+ }
+ }
+
+ private Optional<FileSourceSplit> getNextUnassignedSplit(String hostname) {
+ Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+ while (nextSplit.isPresent()) {
+ FileSourceSplit split = nextSplit.get();
+ // ignore the split if it has been assigned
+ if (!assignedSplits.contains(split.splitId())) {
+ return nextSplit;
+ }
+ nextSplit = splitAssigner.getNext(hostname);
+ }
+ return nextSplit;
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof DynamicFilteringEvent) {
+ LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+ createSplitAssigner(((DynamicFilteringEvent)
sourceEvent).getData());
+ } else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+ }
+
+ private void createSplitAssigner(@Nullable DynamicFilteringData
dynamicFilteringData) {
+ DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
+ if (dynamicFilteringData != null) {
+ fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
+ }
+ Collection<FileSourceSplit> splits;
+ try {
+ splits = fileEnumerator.enumerateSplits(new Path[1],
context.currentParallelism());
+ allEnumeratingSplits =
+
splits.stream().map(FileSourceSplit::splitId).collect(Collectors.toSet());
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not enumerate file splits",
e);
+ }
+ splitAssigner = splitAssignerFactory.create(splits);
+ }
+
+ @Override
+ public void addSplitsBack(List<SplitT> splits, int subtaskId) {
+ LOG.debug("Dynamic File Source Enumerator adds splits back: {}",
splits);
+ if (splitAssigner != null) {
+ List<FileSourceSplit> fileSplits = new ArrayList<>(splits);
+ // Only add back splits enumerating. A split may be filtered after
it is assigned.
+ fileSplits.removeIf(s ->
!allEnumeratingSplits.contains(s.splitId()));
+ // Added splits should be removed from assignedSplits for
re-assignment
+ fileSplits.forEach(s -> assignedSplits.remove(s.splitId()));
+ splitAssigner.addSplits(fileSplits);
+ }
+ }
+
+ @Override
+ public PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
+ throw new UnsupportedOperationException(
+ "DynamicFileSplitEnumerator only supports batch execution.");
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, int attemptNumber,
SourceEvent sourceEvent) {
+ // Only recognize events that don't care attemptNumber
+ handleSourceEvent(subtaskId, sourceEvent);
+ }
+}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
new file mode 100644
index 00000000000..1e098c48609
--- /dev/null
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DynamicFileSplitEnumerator}. */
+class DynamicFileSplitEnumeratorTest {
+
+ @Test
+ void testEnumerating() {
+ String[] splits = new String[] {"0", "1", "2", "3", "4"};
+ MockSplitEnumeratorContext<TestSplit> context = new
MockSplitEnumeratorContext<>(1);
+ context.registerReader(new ReaderInfo(0, ""));
+
+ DynamicFileSplitEnumerator<TestSplit> enumerator =
+ new DynamicFileSplitEnumerator<>(
+ context,
+ () -> new TestDynamicFileEnumerator(splits, splits),
+ SimpleSplitAssigner::new);
+
+ for (String ignored : splits) {
+ enumerator.handleSplitRequest(0, null);
+ }
+
+
assertThat(getAssignedSplits(context)).containsExactlyInAnyOrder(splits);
+ }
+
+ @Test
+ void testDynamicFiltering() {
+ String[] splits = new String[] {"0", "1", "2", "3", "4"};
+ String[] remainingSplits = new String[] {"1", "3"};
+ MockSplitEnumeratorContext<TestSplit> context = new
MockSplitEnumeratorContext<>(1);
+ context.registerReader(new ReaderInfo(0, ""));
+
+ DynamicFileSplitEnumerator<TestSplit> enumerator =
+ new DynamicFileSplitEnumerator<>(
+ context,
+ () -> new TestDynamicFileEnumerator(splits,
remainingSplits),
+ SimpleSplitAssigner::new);
+ enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+ enumerator.handleSplitRequest(0, null);
+ enumerator.handleSplitRequest(0, null);
+
+
assertThat(getAssignedSplits(context)).containsExactlyInAnyOrder(remainingSplits);
+ }
+
+ @Test
+ void testReceiveDynamicFilteringDataAfterStarted() {
+ String[] splits = new String[] {"0", "1", "2", "3", "4"};
+ String[] remainingSplits = new String[] {"1", "3"};
+ MockSplitEnumeratorContext<TestSplit> context = new
MockSplitEnumeratorContext<>(1);
+ context.registerReader(new ReaderInfo(0, ""));
+
+ DynamicFileSplitEnumerator<TestSplit> enumerator =
+ new DynamicFileSplitEnumerator<>(
+ context,
+ () -> new TestDynamicFileEnumerator(splits,
remainingSplits),
+ SimpleSplitAssigner::new);
+
+ enumerator.handleSplitRequest(0, null);
+
+ List<String> alreadyAssigned = getAssignedSplits(context);
+
+ enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+ // request more than 5-1=4 times to check whether a split will be
assigned twice
+ for (int i = 0; i < 6; i++) {
+ enumerator.handleSplitRequest(0, null);
+ }
+
+ alreadyAssigned.addAll(Arrays.asList(remainingSplits));
+ assertThat(getAssignedSplits(context))
+ .containsExactlyInAnyOrder(
+
alreadyAssigned.stream().distinct().toArray(String[]::new));
+ }
+
+ @Test
+ void testAddSplitsBack() {
+ String[] splits = new String[] {"0", "1", "2", "3", "4"};
+ String[] remainingSplits = new String[] {"1", "3"};
+ MockSplitEnumeratorContext<TestSplit> context = new
MockSplitEnumeratorContext<>(1);
+ context.registerReader(new ReaderInfo(0, ""));
+
+ DynamicFileSplitEnumerator<TestSplit> enumerator =
+ new DynamicFileSplitEnumerator<>(
+ context,
+ () -> new TestDynamicFileEnumerator(splits,
remainingSplits),
+ SimpleSplitAssigner::new);
+
+ for (String ignored : splits) {
+ enumerator.handleSplitRequest(0, null);
+ }
+
+ enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+ enumerator.addSplitsBack(
+
Arrays.stream(splits).map(TestSplit::new).collect(Collectors.toList()), 0);
+
+ for (String ignored : splits) {
+ enumerator.handleSplitRequest(0, null);
+ }
+
+ List<String> assignedSplits = getAssignedSplits(context);
+ assertThat(assignedSplits.subList(5, assignedSplits.size()))
+ .containsExactlyInAnyOrder(remainingSplits);
+ }
+
+ private static SourceEvent mockDynamicFilteringEvent() {
+ // Mock a DynamicFilteringData, typeInfo and rowType of which are not
used.
+ return new DynamicFilteringEvent(
+ new DynamicFilteringData(
+ new GenericTypeInfo<>(RowData.class),
+ RowType.of(),
+ Collections.emptyList(),
+ false));
+ }
+
+ private static List<String>
getAssignedSplits(MockSplitEnumeratorContext<TestSplit> context) {
+ return context.getSplitsAssignmentSequence().stream()
+ .flatMap(s -> s.assignment().get(0).stream())
+ .map(FileSourceSplit::splitId)
+ .collect(Collectors.toList());
+ }
+
+ private static class TestSplit extends FileSourceSplit {
+ public TestSplit(String id) {
+ super(id, new Path(), 0, 0, 0L, 0);
+ }
+ }
+
+ private static class TestDynamicFileEnumerator implements
DynamicFileEnumerator {
+ private final List<String> remainingSplits;
+ private List<String> enumeratingSplits;
+
+ private TestDynamicFileEnumerator(String[] allSplits, String[]
remainingSplits) {
+ this.remainingSplits = Arrays.asList(remainingSplits);
+ this.enumeratingSplits = Arrays.asList(allSplits);
+ }
+
+ @Override
+ public void setDynamicFilteringData(DynamicFilteringData data) {
+ // Mock filtering result
+ enumeratingSplits = remainingSplits;
+ }
+
+ @Override
+ public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int
minDesiredSplits) {
+ return
enumeratingSplits.stream().map(TestSplit::new).collect(Collectors.toList());
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
new file mode 100644
index 00000000000..79b3db105b5
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.connector.file.src.impl.DynamicFileSplitEnumerator;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+
+/** {@link DynamicFileSplitEnumerator} for hive source. */
+public class DynamicHiveSplitEnumerator extends
DynamicFileSplitEnumerator<HiveSourceSplit> {
+ public DynamicHiveSplitEnumerator(
+ SplitEnumeratorContext<HiveSourceSplit> context,
+ DynamicFileEnumerator.Provider fileEnumeratorFactory,
+ FileSplitAssigner.Provider splitAssignerFactory) {
+ super(context, fileEnumeratorFactory, splitAssignerFactory);
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index d7f52098761..3f764c0b05a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -62,6 +62,10 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
private final int threadNum;
private final JobConfWrapper jobConfWrapper;
private final List<String> partitionKeys;
+
+ private final String hiveVersion;
+ private final List<String> dynamicFilterPartitionKeys;
+ private final List<HiveTablePartition> partitions;
private final ContinuousPartitionFetcher<Partition, ?> fetcher;
private final HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext;
private final ObjectPath tablePath;
@@ -76,6 +80,9 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
JobConf jobConf,
ObjectPath tablePath,
List<String> partitionKeys,
+ String hiveVersion,
+ @Nullable List<String> dynamicFilterPartitionKeys,
+ List<HiveTablePartition> partitions,
@Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
@Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?>
fetcherContext) {
super(
@@ -92,6 +99,9 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
this.jobConfWrapper = new JobConfWrapper(jobConf);
this.tablePath = tablePath;
this.partitionKeys = partitionKeys;
+ this.hiveVersion = hiveVersion;
+ this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+ this.partitions = partitions;
this.fetcher = fetcher;
this.fetcherContext = fetcherContext;
}
@@ -120,6 +130,8 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
fetcherContext.getConsumeStartOffset(),
Collections.emptyList(),
Collections.emptyList());
+ } else if (dynamicFilterPartitionKeys != null) {
+ return createDynamicSplitEnumerator(enumContext);
} else {
return super.createEnumerator(enumContext);
}
@@ -169,4 +181,18 @@ public class HiveSource<T> extends AbstractFileSource<T,
HiveSourceSplit> {
fetcher,
fetcherContext);
}
+
+ private SplitEnumerator<HiveSourceSplit,
PendingSplitsCheckpoint<HiveSourceSplit>>
+
createDynamicSplitEnumerator(SplitEnumeratorContext<HiveSourceSplit>
enumContext) {
+ return new DynamicHiveSplitEnumerator(
+ enumContext,
+ new HiveSourceDynamicFileEnumerator.Provider(
+ tablePath.getFullName(),
+ dynamicFilterPartitionKeys,
+ partitions,
+ threadNum,
+ hiveVersion,
+ jobConfWrapper),
+ getAssignerFactory());
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 4afff3b181e..185904d7c2e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -91,6 +91,7 @@ public class HiveSourceBuilder {
private int[] projectedFields;
private Long limit;
private List<HiveTablePartition> partitions;
+ private List<String> dynamicFilterPartitionKeys;
/**
* Creates a builder to read a hive table.
@@ -246,6 +247,9 @@ public class HiveSourceBuilder {
jobConf,
tablePath,
partitionKeys,
+ hiveVersion,
+ dynamicFilterPartitionKeys,
+ partitions,
fetcher,
fetcherContext);
}
@@ -259,6 +263,12 @@ public class HiveSourceBuilder {
return this;
}
+ public HiveSourceBuilder setDynamicFilterPartitionKeys(
+ List<String> dynamicFilterPartitionKeys) {
+ this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+ return this;
+ }
+
/** Sets the maximum number of records this source should return. */
public HiveSourceBuilder setLimit(Long limit) {
this.limit = limit;
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
new file mode 100644
index 00000000000..d96999c971f
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.connectors.hive.util.JobConfUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import
org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter;
+import
org.apache.flink.table.data.util.DataFormatConverters.LocalDateTimeConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicFileEnumerator} implementation for hive source. It uses
{@link
+ * HiveSourceFileEnumerator#createInputSplits} to generate splits like
HiveSourceFileEnumerator, but
+ * only enumerates {@link HiveTablePartition}s that exist in the {@link
DynamicFilteringData} if a
+ * DynamicFilteringData is provided.
+ */
+public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
+
+ public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+ static {
+ // Note: also modify #createRowData if the SUPPORTED_TYPES is modified.
+ Set<LogicalTypeRoot> supportedTypes = new HashSet<>();
+ supportedTypes.add(LogicalTypeRoot.TINYINT);
+ supportedTypes.add(LogicalTypeRoot.SMALLINT);
+ supportedTypes.add(LogicalTypeRoot.INTEGER);
+ supportedTypes.add(LogicalTypeRoot.BIGINT);
+ supportedTypes.add(LogicalTypeRoot.CHAR);
+ supportedTypes.add(LogicalTypeRoot.VARCHAR);
+ supportedTypes.add(LogicalTypeRoot.DATE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ SUPPORTED_TYPES = supportedTypes;
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HiveSourceDynamicFileEnumerator.class);
+
+ private final String table;
+ private final List<String> dynamicFilterPartitionKeys;
+ // For non-partition hive table, partitions only contains one partition
which partitionValues is
+ // empty.
+ private final List<HiveTablePartition> allPartitions;
+ private final int threadNum;
+ private final JobConf jobConf;
+ private final String defaultPartitionName;
+ private final HiveShim hiveShim;
+ private transient List<HiveTablePartition> finalPartitions;
+
+ public HiveSourceDynamicFileEnumerator(
+ String table,
+ List<String> dynamicFilterPartitionKeys,
+ List<HiveTablePartition> allPartitions,
+ int threadNum,
+ String hiveVersion,
+ JobConf jobConf) {
+ this.table = checkNotNull(table);
+ this.dynamicFilterPartitionKeys =
checkNotNull(dynamicFilterPartitionKeys);
+ this.allPartitions = checkNotNull(allPartitions);
+ this.threadNum = threadNum;
+ this.jobConf = checkNotNull(jobConf);
+ this.defaultPartitionName =
JobConfUtils.getDefaultPartitionName(jobConf);
+ this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
+ this.finalPartitions = this.allPartitions;
+ }
+
+ public void setDynamicFilteringData(DynamicFilteringData data) {
+ LOG.debug("Filtering partitions of table {} based on the data: {}",
table, data);
+ if (!data.isFiltering()) {
+ finalPartitions = allPartitions;
+ return;
+ }
+ finalPartitions = new ArrayList<>();
+ RowType rowType = data.getRowType();
+ Preconditions.checkArgument(rowType.getFieldCount() ==
dynamicFilterPartitionKeys.size());
+ for (HiveTablePartition partition : allPartitions) {
+ RowData partitionRow = createRowData(rowType,
partition.getPartitionSpec());
+ if (data.contains(partitionRow)) {
+ finalPartitions.add(partition);
+ }
+ }
+ LOG.info(
+ "Dynamic filtering table {}, original partition number is {},
remaining partition number {}",
+ table,
+ allPartitions.size(),
+ finalPartitions.size());
+ }
+
+ @VisibleForTesting
+ RowData createRowData(RowType rowType, Map<String, String> partitionSpec) {
+ GenericRowData rowData = new GenericRowData(rowType.getFieldCount());
+ for (int i = 0; i < rowType.getFieldCount(); ++i) {
+ String value =
partitionSpec.get(dynamicFilterPartitionKeys.get(i));
+ Object convertedValue =
+ HivePartitionUtils.restorePartitionValueFromType(
+ hiveShim, value, rowType.getTypeAt(i),
defaultPartitionName);
+ // Note: also modify supported types if the switch is modified.
+ switch (rowType.getTypeAt(i).getTypeRoot()) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ // No further process is necessary.
+ break;
+ case CHAR:
+ case VARCHAR:
+ convertedValue = StringData.fromString((String)
convertedValue);
+ break;
+ case DATE:
+ convertedValue =
+ LocalDateConverter.INSTANCE.toInternal((LocalDate)
convertedValue);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ convertedValue =
+ new LocalDateTimeConverter(9)
+ .toInternal((LocalDateTime)
convertedValue);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported type for dynamic filtering:" +
rowType.getTypeAt(i));
+ }
+ rowData.setField(i, convertedValue);
+ }
+ return rowData;
+ }
+
+ @Override
+ public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int
minDesiredSplits)
+ throws IOException {
+ return new ArrayList<>(
+ HiveSourceFileEnumerator.createInputSplits(
+ minDesiredSplits, finalPartitions, threadNum,
jobConf));
+ }
+
+ @VisibleForTesting
+ List<HiveTablePartition> getFinalPartitions() {
+ return finalPartitions;
+ }
+
+ /** A factory to create {@link HiveSourceDynamicFileEnumerator}. */
+ public static class Provider implements DynamicFileEnumerator.Provider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String table;
+ private final List<String> dynamicFilterPartitionKeys;
+ private final List<HiveTablePartition> partitions;
+ private final int threadNum;
+ private final String hiveVersion;
+ private final JobConfWrapper jobConfWrapper;
+
+ public Provider(
+ String table,
+ List<String> dynamicFilterPartitionKeys,
+ List<HiveTablePartition> partitions,
+ int threadNum,
+ String hiveVersion,
+ JobConfWrapper jobConfWrapper) {
+ this.table = checkNotNull(table);
+ this.dynamicFilterPartitionKeys =
checkNotNull(dynamicFilterPartitionKeys);
+ this.partitions = checkNotNull(partitions);
+ this.threadNum = threadNum;
+ this.hiveVersion = checkNotNull(hiveVersion);
+ this.jobConfWrapper = checkNotNull(jobConfWrapper);
+ }
+
+ @Override
+ public DynamicFileEnumerator create() {
+ return new HiveSourceDynamicFileEnumerator(
+ table,
+ dynamicFilterPartitionKeys,
+ partitions,
+ threadNum,
+ hiveVersion,
+ jobConfWrapper.conf());
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index cc407efcec0..a339502b016 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -52,6 +52,7 @@ import
org.apache.flink.table.connector.format.FileBasedStatisticsReportableInpu
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -60,6 +61,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -72,7 +74,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -82,6 +86,8 @@ import static
org.apache.flink.connector.file.table.FileSystemConnectorOptions.P
import static
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
import static
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
import static
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
/** A TableSource implementation to read data from Hive tables. */
public class HiveTableSource
@@ -89,7 +95,8 @@ public class HiveTableSource
SupportsPartitionPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown,
- SupportsStatisticReport {
+ SupportsStatisticReport,
+ SupportsDynamicFiltering {
private static final Logger LOG =
LoggerFactory.getLogger(HiveTableSource.class);
private static final String HIVE_TRANSFORMATION = "hive";
@@ -104,6 +111,7 @@ public class HiveTableSource
// Remaining partition specs after partition pruning is performed. Null if
pruning is not pushed
// down.
@Nullable private List<Map<String, String>> remainingPartitions = null;
+ @Nullable private List<String> dynamicFilterPartitionKeys = null;
protected int[] projectedFields;
private Long limit = null;
@@ -181,6 +189,7 @@ public class HiveTableSource
execEnv,
sourceBuilder
.setPartitions(hivePartitionsToRead)
+
.setDynamicFilterPartitionKeys(dynamicFilterPartitionKeys)
.buildWithDefaultBulkFormat())
.setParallelism(parallelism);
}
@@ -247,6 +256,51 @@ public class HiveTableSource
}
}
+ @Override
+ public List<String> applyDynamicFiltering(List<String>
candidateFilterFields) {
+ if (catalogTable.getPartitionKeys() != null
+ && catalogTable.getPartitionKeys().size() != 0) {
+ checkArgument(
+ !candidateFilterFields.isEmpty(),
+ "At least one field should be provided for dynamic
filtering");
+
+ // only accept partition fields of supported types to do dynamic
partition pruning
+ List<String> dynamicFilterPartitionKeys = new ArrayList<>();
+ for (String field : candidateFilterFields) {
+ if (catalogTable.getPartitionKeys().contains(field)
+ &&
HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
+ catalogTable
+ .getSchema()
+ .getFieldDataType(field)
+ .map(DataType::getLogicalType)
+ .map(LogicalType::getTypeRoot)
+ .orElse(null))) {
+ dynamicFilterPartitionKeys.add(field);
+ }
+ }
+ if (dynamicFilterPartitionKeys.isEmpty()) {
+ LOG.warn(
+ "No dynamic filter field is accepted,"
+ + " only partition fields can use for dynamic
filtering.");
+ }
+
+ // sort before check to ensure the lists have same elements in
same order
+ dynamicFilterPartitionKeys.sort(String::compareTo);
+ checkState(
+ this.dynamicFilterPartitionKeys == null
+ ||
this.dynamicFilterPartitionKeys.equals(dynamicFilterPartitionKeys),
+ "Dynamic filtering is applied twice but with different
keys: %s != %s",
+ this.dynamicFilterPartitionKeys,
+ dynamicFilterPartitionKeys);
+
+ this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+ return dynamicFilterPartitionKeys;
+ } else {
+ LOG.warn("No dynamic filter field is accepted since the table is
non-partitioned.");
+ return Collections.emptyList();
+ }
+ }
+
@Override
public boolean supportsNestedProjection() {
return false;
@@ -273,6 +327,7 @@ public class HiveTableSource
source.remainingPartitions = remainingPartitions;
source.projectedFields = projectedFields;
source.limit = limit;
+ source.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
return source;
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 0111e3e2f3d..aed6242c9f6 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.SqlDialect;
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -35,6 +37,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
@@ -1194,6 +1197,138 @@ public class HiveDialectITCase {
}
}
+ @Test
+ public void testDynamicPartitionPruning() throws Exception {
+ // src table
+ tableEnv.executeSql("create table dim (x int,y string,z int)");
+ tableEnv.executeSql("insert into dim values
(1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+ // partitioned dest table
+ tableEnv.executeSql("create table fact (a int, b bigint, c string)
partitioned by (p int)");
+ tableEnv.executeSql(
+ "insert into fact partition (p=1) values
(10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=2) values
(20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=3) values
(30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+ .await();
+
+ System.out.println(
+ tableEnv.explainSql(
+ "select a, b, c, p, x, y from fact, dim where x = p
and z = 1 order by a"));
+
+ tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
+
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM,
false);
+
+ String sql = "select a, b, c, p, x, y from fact, dim where x = p and z
= 1 order by a";
+ String sqlSwapFactDim =
+ "select a, b, c, p, x, y from dim, fact where x = p and z = 1
order by a";
+
+ String expected =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12,
102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2,
b], +I[22, 202, ccc, 2, 2, b]]";
+
+ // Check dynamic partition pruning is working
+ String plan = tableEnv.explainSql(sql);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ plan = tableEnv.explainSql(sqlSwapFactDim);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ // Validate results
+ List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ // Validate results with table statistics
+ tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+ .get()
+ .alterTableStatistics(
+ new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
+ new CatalogTableStatistics(3, -1, -1, -1),
+ false);
+
+ results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+ assertThat(results.toString()).isEqualTo(expected);
+ }
+
+ @Test
+ public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
+ tableEnv.executeSql("create table dim (x int,y string,z int)");
+ tableEnv.executeSql("insert into dim values
(1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+ // partitioned dest table
+ tableEnv.executeSql("create table fact (a int, b bigint, c string)
partitioned by (p int)");
+ tableEnv.executeSql(
+ "insert into fact partition (p=1) values
(10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=2) values
(20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=3) values
(30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+ .await();
+
+ // partitioned dest table
+ tableEnv.executeSql(
+ "create table fact2 (a int, b bigint, c string) partitioned by
(p int)");
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=1) values
(40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=2) values
(50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=3) values
(60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
+ .await();
+
+ tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
+
+
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM,
false);
+
+ // two fact sources share the same dynamic filter
+ String sql =
+ "select * from ((select a, b, c, p, x, y from fact, dim where
x = p and z = 1) "
+ + "union all "
+ + "(select a, b, c, p, x, y from fact2, dim where x =
p and z = 1)) t order by a";
+ String expected =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12,
102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2,
b], +I[22, 202, ccc, 2, 2, b], "
+ + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1,
a], +I[42, 102, ccc, 1, 1, a], "
+ + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2,
b], +I[52, 202, ccc, 2, 2, b]]";
+
+ String plan = tableEnv.explainSql(sql);
+
assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
+
+ List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ // two fact sources use different dynamic filters
+ String sql2 =
+ "select * from ((select a, b, c, p, x, y from fact, dim where
x = p and z = 1) "
+ + "union all "
+ + "(select a, b, c, p, x, y from fact2, dim where x =
p and z = 2)) t order by a";
+ String expected2 =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12,
102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2,
b], +I[22, 202, ccc, 2, 2, b], "
+ + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3,
c], +I[62, 302, ccc, 3, 3, c]]";
+
+ plan = tableEnv.explainSql(sql2);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ results = queryResult(tableEnv.sqlQuery(sql2));
+ assertThat(results.toString()).isEqualTo(expected2);
+ }
+
private void verifyUnsupportedOperation(String ddl) {
assertThatThrownBy(() -> tableEnv.executeSql(ddl))
.isInstanceOf(ValidationException.class)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
new file mode 100644
index 00000000000..14a4288bb92
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import
org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter;
+import
org.apache.flink.table.data.util.DataFormatConverters.LocalDateTimeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for HiveSourceDynamicFileEnumerator. */
+class HiveSourceDynamicFileEnumeratorTest {
+
+ @Test
+ void testFiltering() {
+ List<String> keys = Collections.singletonList("a");
+ List<Map<String, String>> partitionSpecs =
+ Arrays.asList(
+ Collections.singletonMap("a", "31"),
Collections.singletonMap("a", "32"));
+ HiveSourceDynamicFileEnumerator enumerator =
createTestEnumerator(keys, partitionSpecs);
+
+ assertThat(enumerator.getFinalPartitions()).hasSize(2);
+ assertThat(
+ enumerator.getFinalPartitions().stream()
+ .map(HiveTablePartition::getPartitionSpec)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(partitionSpecs.toArray(new Map[0]));
+
+ RowType rowType = RowType.of(new IntType());
+ TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+ GenericRowData filteringRow = new GenericRowData(1);
+ filteringRow.setField(0, 31);
+ DynamicFilteringData data =
+ new DynamicFilteringData(
+ InternalTypeInfo.of(rowType),
+ rowType,
+ Collections.singletonList(serialize(rowTypeInfo,
filteringRow)),
+ true);
+ enumerator.setDynamicFilteringData(data);
+
+ assertThat(enumerator.getFinalPartitions()).hasSize(1);
+
assertThat(enumerator.getFinalPartitions().get(0).getPartitionSpec().get("a"))
+ .isEqualTo("31");
+ }
+
+ @Test
+ void testCreateRowSupportedTypes() {
+ List<Tuple3<LogicalType, Object, String>> testTypeValues = new
ArrayList<>();
+ testTypeValues.add(new Tuple3<>(new IntType(), 42, "42"));
+ testTypeValues.add(new Tuple3<>(new BigIntType(), 9876543210L,
"9876543210"));
+ testTypeValues.add(new Tuple3<>(new SmallIntType(), (short) 41, "41"));
+ testTypeValues.add(new Tuple3<>(new TinyIntType(), (byte) 40, "40"));
+ testTypeValues.add(new Tuple3<>(new VarCharType(),
StringData.fromString("1234"), "1234"));
+ testTypeValues.add(new Tuple3<>(new CharType(),
StringData.fromString("7"), "7"));
+ testTypeValues.add(
+ new Tuple3<>(
+ new DateType(),
+
LocalDateConverter.INSTANCE.toInternal(LocalDate.of(2022, 2, 22)),
+ "2022-2-22"));
+ testTypeValues.add(
+ new Tuple3<>(
+ new TimestampType(9),
+ new LocalDateTimeConverter(9)
+ .toInternal(LocalDateTime.of(2022, 2, 22, 22,
2, 20, 20222022)),
+ "2022-2-22 22:02:20.020222022"));
+
+ // Default partition values
+ testTypeValues.add(
+ new Tuple3<>(
+ new VarCharType(),
+
StringData.fromString(HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
+ HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+ testTypeValues.add(
+ new Tuple3<>(
+ new IntType(), null,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+
+ RowType rowType =
+ RowType.of(testTypeValues.stream().map(t ->
t.f0).toArray(LogicalType[]::new));
+ List<String> keys = new ArrayList<>();
+ Map<String, String> spec = new HashMap<>();
+ for (int i = 0; i < testTypeValues.size(); i++) {
+ keys.add(String.valueOf(i));
+ spec.put(String.valueOf(i), testTypeValues.get(i).f2);
+ }
+ HiveSourceDynamicFileEnumerator enumerator =
+ createTestEnumerator(keys, Collections.emptyList());
+
+ RowData result = enumerator.createRowData(rowType, spec);
+
+ for (int i = 0; i < testTypeValues.size(); i++) {
+ LogicalType type = testTypeValues.get(i).f0;
+ Object expected = testTypeValues.get(i).f1;
+ FieldGetter getter = RowData.createFieldGetter(type, i);
+ assertThat(getter.getFieldOrNull(result))
+ .withFailMessage(
+ () ->
+ "Mismatching row type "
+ + type
+ + ", expected:"
+ + expected
+ + ", actual:"
+ + getter.getFieldOrNull(result))
+ .isEqualTo(testTypeValues.get(i).f1);
+ }
+ }
+
+ private HiveSourceDynamicFileEnumerator createTestEnumerator(
+ List<String> keys, List<Map<String, String>> partitionSpecs) {
+ return new HiveSourceDynamicFileEnumerator(
+ "",
+ keys,
+ partitionSpecs.stream()
+ .map(
+ spec ->
+ new HiveTablePartition(
+ new StorageDescriptor(), spec,
new Properties()))
+ .collect(Collectors.toList()),
+ 1,
+ HiveShimLoader.getHiveVersion(),
+ new JobConf());
+ }
+
+ private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ typeInfo.createSerializer(new ExecutionConfig())
+ .serialize(row, new DataOutputViewStreamWrapper(baos));
+ } catch (IOException e) {
+ // throw as RuntimeException so the function can use in lambda
+ throw new RuntimeException(e);
+ }
+ return baos.toByteArray();
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
index 0a81bffdd4c..f0be9d200e7 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.connector.source;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,6 +44,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Data for dynamic filtering. */
+@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index c2497cfbb36..989a20c6670 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -19,10 +19,8 @@
package org.apache.flink.table.planner.connectors;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
@@ -38,7 +36,6 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
-import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
@@ -65,7 +62,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -259,22 +255,6 @@ public final class DynamicSourceUtils {
return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
}
- /** Returns true if the source is FLIP-27 source, else false. */
- public static boolean isNewSource(ScanTableSource scanTableSource) {
- ScanTableSource.ScanRuntimeProvider provider =
-
scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
- if (provider instanceof SourceProvider) {
- return true;
- } else if (provider instanceof TransformationScanProvider) {
- Transformation<?> transformation =
- ((TransformationScanProvider) provider)
- .createTransformation(name -> Optional.empty());
- return transformation instanceof SourceTransformation;
- }
- // TODO supports more
- return false;
- }
-
//
--------------------------------------------------------------------------------------------
/** Creates a specialized node for assigning watermarks. */
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
index 79317c1f826..601f0298179 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
@@ -18,14 +18,18 @@
package org.apache.flink.table.planner.plan.rules.physical.batch;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
@@ -37,6 +41,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.utils.ShortcutUtils;
+import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
@@ -65,6 +70,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED;
@@ -177,7 +183,7 @@ public abstract class DynamicPartitionPruningRule extends
RelRule<RelRule.Config
return false;
}
- if (!DynamicSourceUtils.isNewSource((ScanTableSource) tableSource)) {
+ if (!isNewSource((ScanTableSource) tableSource)) {
return false;
}
@@ -189,6 +195,31 @@ public abstract class DynamicPartitionPruningRule extends
RelRule<RelRule.Config
return !acceptedFieldIndices.isEmpty();
}
+ /** Returns true if the source is FLIP-27 source, else false. */
+ private static boolean isNewSource(ScanTableSource scanTableSource) {
+ ScanTableSource.ScanRuntimeProvider provider =
+
scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ if (provider instanceof SourceProvider) {
+ return true;
+ } else if (provider instanceof TransformationScanProvider) {
+ Transformation<?> transformation =
+ ((TransformationScanProvider) provider)
+ .createTransformation(name -> Optional.empty());
+ return transformation instanceof SourceTransformation;
+ } else if (provider instanceof DataStreamScanProvider) {
+ // Suppose DataStreamScanProvider of sources that support dynamic
filtering will use new
+ // Source. It's not reliable and should be checked.
+ // TODO FLINK-28864 check if the source used by the
DataStreamScanProvider is actually a
+ // new source.
+ // This situation will not generate wrong result because it's
handled when translating
+ // BatchTableSourceScan. The only effect is the physical plan and
the exec node plan
+ // have DPP nodes, but they do not work in runtime.
+ return true;
+ }
+ // TODO supports more
+ return false;
+ }
+
private static List<Integer> getAcceptedFieldIndices(
List<Integer> factJoinKeys,
@Nullable BatchPhysicalCalc factCalc,