This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dc8890f1b8 [FLINK-28711][hive] Hive source supports dynamic filtering
0dc8890f1b8 is described below

commit 0dc8890f1b8164bc37bd8b209e6d4eef37386087
Author: Gen Luo <[email protected]>
AuthorDate: Tue Aug 2 20:16:58 2022 +0800

    [FLINK-28711][hive] Hive source supports dynamic filtering
    
    This closes #20415
---
 .../file/src/enumerate/DynamicFileEnumerator.java  |  48 +++++
 .../file/src/impl/DynamicFileSplitEnumerator.java  | 207 +++++++++++++++++++
 .../src/impl/DynamicFileSplitEnumeratorTest.java   | 190 ++++++++++++++++++
 .../hive/DynamicHiveSplitEnumerator.java           |  35 ++++
 .../apache/flink/connectors/hive/HiveSource.java   |  26 +++
 .../flink/connectors/hive/HiveSourceBuilder.java   |  10 +
 .../hive/HiveSourceDynamicFileEnumerator.java      | 221 +++++++++++++++++++++
 .../flink/connectors/hive/HiveTableSource.java     |  57 +++++-
 .../flink/connectors/hive/HiveDialectITCase.java   | 135 +++++++++++++
 .../hive/HiveSourceDynamicFileEnumeratorTest.java  | 188 ++++++++++++++++++
 .../connector/source/DynamicFilteringData.java     |   2 +
 .../planner/connectors/DynamicSourceUtils.java     |  20 --
 .../batch/DynamicPartitionPruningRule.java         |  35 +++-
 13 files changed, 1151 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
new file mode 100644
index 00000000000..416c9b58deb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DynamicFileEnumerator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+
+/**
+ * {@code FileEnumerator} that supports dynamic filtering. The enumerator only 
enumerates splits
+ * that exist in the given {@link DynamicFilteringData}, while enumerates all 
splits if no
+ * DynamicFilteringData is provided when #enumerateSplits is called.
+ */
+@PublicEvolving
+public interface DynamicFileEnumerator extends FileEnumerator {
+
+    /**
+     * Provides a {@link DynamicFilteringData} for filtering while the 
enumerator is enumerating
+     * splits.
+     *
+     * <p>The {@link DynamicFilteringData} is typically collected by a 
collector operator, and
+     * transferred here by a coordinating event. The method should never be 
called directly by
+     * users.
+     */
+    void setDynamicFilteringData(DynamicFilteringData data);
+
+    /** Factory for the {@link DynamicFileEnumerator}. */
+    @FunctionalInterface
+    interface Provider extends FileEnumerator.Provider {
+
+        DynamicFileEnumerator create();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
new file mode 100644
index 00000000000..bec6b6d904e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation that supports dynamic filtering.
+ *
+ * <p>This enumerator handles {@link DynamicFilteringEvent} and filter out the 
desired input splits
+ * with the support of the {@link DynamicFileEnumerator}.
+ *
+ * <p>If the enumerator receives the first split request before any dynamic 
filtering data is
+ * received, it will enumerate all splits. If a DynamicFilterEvent is received 
during the fully
+ * enumerating, the remaining splits will be filtered accordingly.
+ */
+@Internal
+public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
+        implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
+                SupportsHandleExecutionAttemptSourceEvent {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicFileSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<SplitT> context;
+
+    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
+
+    private final FileSplitAssigner.Provider splitAssignerFactory;
+
+    /**
+     * Stores the id of splits that has been assigned. The split assigner may 
be rebuilt when a
+     * DynamicFilteringEvent is received. After that, the splits that are 
already assigned can be
+     * assigned for the second time. We have to retain the state and filter 
out the splits that has
+     * been assigned with this set.
+     */
+    private final Set<String> assignedSplits;
+
+    private transient Set<String> allEnumeratingSplits;
+
+    private transient FileSplitAssigner splitAssigner;
+
+    // ------------------------------------------------------------------------
+
+    public DynamicFileSplitEnumerator(
+            SplitEnumeratorContext<SplitT> context,
+            DynamicFileEnumerator.Provider fileEnumeratorFactory,
+            FileSplitAssigner.Provider splitAssignerFactory) {
+        this.context = checkNotNull(context);
+        this.splitAssignerFactory = checkNotNull(splitAssignerFactory);
+        this.fileEnumeratorFactory = checkNotNull(fileEnumeratorFactory);
+        this.assignedSplits = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void close() throws IOException {
+        // no resources to close
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon 
registration
+    }
+
+    @Override
+    public void handleSplitRequest(int subtask, @Nullable String hostname) {
+        if (!context.registeredReaders().containsKey(subtask)) {
+            // reader failed between sending the request and now. skip this 
request.
+            return;
+        }
+
+        if (splitAssigner == null) {
+            // No DynamicFilteringData is received before the first split 
request,
+            // create a split assigner that handles all splits
+            createSplitAssigner(null);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            final String hostInfo =
+                    hostname == null ? "(no host locality info)" : "(on host 
'" + hostname + "')";
+            LOG.debug("Subtask {} {} is requesting a file source split", 
subtask, hostInfo);
+        }
+
+        final Optional<FileSourceSplit> nextSplit = 
getNextUnassignedSplit(hostname);
+        if (nextSplit.isPresent()) {
+            final FileSourceSplit split = nextSplit.get();
+            context.assignSplit((SplitT) split, subtask);
+            assignedSplits.add(split.splitId());
+            LOG.debug("Assigned split to subtask {} : {}", subtask, split);
+        } else {
+            context.signalNoMoreSplits(subtask);
+            LOG.info("No more splits available for subtask {}", subtask);
+        }
+    }
+
+    private Optional<FileSourceSplit> getNextUnassignedSplit(String hostname) {
+        Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+        while (nextSplit.isPresent()) {
+            FileSourceSplit split = nextSplit.get();
+            // ignore the split if it has been assigned
+            if (!assignedSplits.contains(split.splitId())) {
+                return nextSplit;
+            }
+            nextSplit = splitAssigner.getNext(hostname);
+        }
+        return nextSplit;
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof DynamicFilteringEvent) {
+            LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+            createSplitAssigner(((DynamicFilteringEvent) 
sourceEvent).getData());
+        } else {
+            LOG.error("Received unrecognized event: {}", sourceEvent);
+        }
+    }
+
+    private void createSplitAssigner(@Nullable DynamicFilteringData 
dynamicFilteringData) {
+        DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
+        if (dynamicFilteringData != null) {
+            fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
+        }
+        Collection<FileSourceSplit> splits;
+        try {
+            splits = fileEnumerator.enumerateSplits(new Path[1], 
context.currentParallelism());
+            allEnumeratingSplits =
+                    
splits.stream().map(FileSourceSplit::splitId).collect(Collectors.toSet());
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
+        }
+        splitAssigner = splitAssignerFactory.create(splits);
+    }
+
+    @Override
+    public void addSplitsBack(List<SplitT> splits, int subtaskId) {
+        LOG.debug("Dynamic File Source Enumerator adds splits back: {}", 
splits);
+        if (splitAssigner != null) {
+            List<FileSourceSplit> fileSplits = new ArrayList<>(splits);
+            // Only add back splits enumerating. A split may be filtered after 
it is assigned.
+            fileSplits.removeIf(s -> 
!allEnumeratingSplits.contains(s.splitId()));
+            // Added splits should be removed from assignedSplits for 
re-assignment
+            fileSplits.forEach(s -> assignedSplits.remove(s.splitId()));
+            splitAssigner.addSplits(fileSplits);
+        }
+    }
+
+    @Override
+    public PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
+        throw new UnsupportedOperationException(
+                "DynamicFileSplitEnumerator only supports batch execution.");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, int attemptNumber, 
SourceEvent sourceEvent) {
+        // Only recognize events that don't care attemptNumber
+        handleSourceEvent(subtaskId, sourceEvent);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
new file mode 100644
index 00000000000..1e098c48609
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumeratorTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DynamicFileSplitEnumerator}. */
+class DynamicFileSplitEnumeratorTest {
+
+    @Test
+    void testEnumerating() {
+        String[] splits = new String[] {"0", "1", "2", "3", "4"};
+        MockSplitEnumeratorContext<TestSplit> context = new 
MockSplitEnumeratorContext<>(1);
+        context.registerReader(new ReaderInfo(0, ""));
+
+        DynamicFileSplitEnumerator<TestSplit> enumerator =
+                new DynamicFileSplitEnumerator<>(
+                        context,
+                        () -> new TestDynamicFileEnumerator(splits, splits),
+                        SimpleSplitAssigner::new);
+
+        for (String ignored : splits) {
+            enumerator.handleSplitRequest(0, null);
+        }
+
+        
assertThat(getAssignedSplits(context)).containsExactlyInAnyOrder(splits);
+    }
+
+    @Test
+    void testDynamicFiltering() {
+        String[] splits = new String[] {"0", "1", "2", "3", "4"};
+        String[] remainingSplits = new String[] {"1", "3"};
+        MockSplitEnumeratorContext<TestSplit> context = new 
MockSplitEnumeratorContext<>(1);
+        context.registerReader(new ReaderInfo(0, ""));
+
+        DynamicFileSplitEnumerator<TestSplit> enumerator =
+                new DynamicFileSplitEnumerator<>(
+                        context,
+                        () -> new TestDynamicFileEnumerator(splits, 
remainingSplits),
+                        SimpleSplitAssigner::new);
+        enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+        enumerator.handleSplitRequest(0, null);
+        enumerator.handleSplitRequest(0, null);
+
+        
assertThat(getAssignedSplits(context)).containsExactlyInAnyOrder(remainingSplits);
+    }
+
+    @Test
+    void testReceiveDynamicFilteringDataAfterStarted() {
+        String[] splits = new String[] {"0", "1", "2", "3", "4"};
+        String[] remainingSplits = new String[] {"1", "3"};
+        MockSplitEnumeratorContext<TestSplit> context = new 
MockSplitEnumeratorContext<>(1);
+        context.registerReader(new ReaderInfo(0, ""));
+
+        DynamicFileSplitEnumerator<TestSplit> enumerator =
+                new DynamicFileSplitEnumerator<>(
+                        context,
+                        () -> new TestDynamicFileEnumerator(splits, 
remainingSplits),
+                        SimpleSplitAssigner::new);
+
+        enumerator.handleSplitRequest(0, null);
+
+        List<String> alreadyAssigned = getAssignedSplits(context);
+
+        enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+        // request more than 5-1=4 times to check whether a split will be 
assigned twice
+        for (int i = 0; i < 6; i++) {
+            enumerator.handleSplitRequest(0, null);
+        }
+
+        alreadyAssigned.addAll(Arrays.asList(remainingSplits));
+        assertThat(getAssignedSplits(context))
+                .containsExactlyInAnyOrder(
+                        
alreadyAssigned.stream().distinct().toArray(String[]::new));
+    }
+
+    @Test
+    void testAddSplitsBack() {
+        String[] splits = new String[] {"0", "1", "2", "3", "4"};
+        String[] remainingSplits = new String[] {"1", "3"};
+        MockSplitEnumeratorContext<TestSplit> context = new 
MockSplitEnumeratorContext<>(1);
+        context.registerReader(new ReaderInfo(0, ""));
+
+        DynamicFileSplitEnumerator<TestSplit> enumerator =
+                new DynamicFileSplitEnumerator<>(
+                        context,
+                        () -> new TestDynamicFileEnumerator(splits, 
remainingSplits),
+                        SimpleSplitAssigner::new);
+
+        for (String ignored : splits) {
+            enumerator.handleSplitRequest(0, null);
+        }
+
+        enumerator.handleSourceEvent(0, mockDynamicFilteringEvent());
+
+        enumerator.addSplitsBack(
+                
Arrays.stream(splits).map(TestSplit::new).collect(Collectors.toList()), 0);
+
+        for (String ignored : splits) {
+            enumerator.handleSplitRequest(0, null);
+        }
+
+        List<String> assignedSplits = getAssignedSplits(context);
+        assertThat(assignedSplits.subList(5, assignedSplits.size()))
+                .containsExactlyInAnyOrder(remainingSplits);
+    }
+
+    private static SourceEvent mockDynamicFilteringEvent() {
+        // Mock a DynamicFilteringData, typeInfo and rowType of which are not 
used.
+        return new DynamicFilteringEvent(
+                new DynamicFilteringData(
+                        new GenericTypeInfo<>(RowData.class),
+                        RowType.of(),
+                        Collections.emptyList(),
+                        false));
+    }
+
+    private static List<String> 
getAssignedSplits(MockSplitEnumeratorContext<TestSplit> context) {
+        return context.getSplitsAssignmentSequence().stream()
+                .flatMap(s -> s.assignment().get(0).stream())
+                .map(FileSourceSplit::splitId)
+                .collect(Collectors.toList());
+    }
+
+    private static class TestSplit extends FileSourceSplit {
+        public TestSplit(String id) {
+            super(id, new Path(), 0, 0, 0L, 0);
+        }
+    }
+
+    private static class TestDynamicFileEnumerator implements 
DynamicFileEnumerator {
+        private final List<String> remainingSplits;
+        private List<String> enumeratingSplits;
+
+        private TestDynamicFileEnumerator(String[] allSplits, String[] 
remainingSplits) {
+            this.remainingSplits = Arrays.asList(remainingSplits);
+            this.enumeratingSplits = Arrays.asList(allSplits);
+        }
+
+        @Override
+        public void setDynamicFilteringData(DynamicFilteringData data) {
+            // Mock filtering result
+            enumeratingSplits = remainingSplits;
+        }
+
+        @Override
+        public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int 
minDesiredSplits) {
+            return 
enumeratingSplits.stream().map(TestSplit::new).collect(Collectors.toList());
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
new file mode 100644
index 00000000000..79b3db105b5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/DynamicHiveSplitEnumerator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.connector.file.src.impl.DynamicFileSplitEnumerator;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+
+/** {@link DynamicFileSplitEnumerator} for hive source. */
+public class DynamicHiveSplitEnumerator extends 
DynamicFileSplitEnumerator<HiveSourceSplit> {
+    public DynamicHiveSplitEnumerator(
+            SplitEnumeratorContext<HiveSourceSplit> context,
+            DynamicFileEnumerator.Provider fileEnumeratorFactory,
+            FileSplitAssigner.Provider splitAssignerFactory) {
+        super(context, fileEnumeratorFactory, splitAssignerFactory);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index d7f52098761..3f764c0b05a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -62,6 +62,10 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
     private final int threadNum;
     private final JobConfWrapper jobConfWrapper;
     private final List<String> partitionKeys;
+
+    private final String hiveVersion;
+    private final List<String> dynamicFilterPartitionKeys;
+    private final List<HiveTablePartition> partitions;
     private final ContinuousPartitionFetcher<Partition, ?> fetcher;
     private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext;
     private final ObjectPath tablePath;
@@ -76,6 +80,9 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
             JobConf jobConf,
             ObjectPath tablePath,
             List<String> partitionKeys,
+            String hiveVersion,
+            @Nullable List<String> dynamicFilterPartitionKeys,
+            List<HiveTablePartition> partitions,
             @Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
             @Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext) {
         super(
@@ -92,6 +99,9 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
         this.jobConfWrapper = new JobConfWrapper(jobConf);
         this.tablePath = tablePath;
         this.partitionKeys = partitionKeys;
+        this.hiveVersion = hiveVersion;
+        this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+        this.partitions = partitions;
         this.fetcher = fetcher;
         this.fetcherContext = fetcherContext;
     }
@@ -120,6 +130,8 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
                     fetcherContext.getConsumeStartOffset(),
                     Collections.emptyList(),
                     Collections.emptyList());
+        } else if (dynamicFilterPartitionKeys != null) {
+            return createDynamicSplitEnumerator(enumContext);
         } else {
             return super.createEnumerator(enumContext);
         }
@@ -169,4 +181,18 @@ public class HiveSource<T> extends AbstractFileSource<T, 
HiveSourceSplit> {
                 fetcher,
                 fetcherContext);
     }
+
+    private SplitEnumerator<HiveSourceSplit, 
PendingSplitsCheckpoint<HiveSourceSplit>>
+            
createDynamicSplitEnumerator(SplitEnumeratorContext<HiveSourceSplit> 
enumContext) {
+        return new DynamicHiveSplitEnumerator(
+                enumContext,
+                new HiveSourceDynamicFileEnumerator.Provider(
+                        tablePath.getFullName(),
+                        dynamicFilterPartitionKeys,
+                        partitions,
+                        threadNum,
+                        hiveVersion,
+                        jobConfWrapper),
+                getAssignerFactory());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 4afff3b181e..185904d7c2e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -91,6 +91,7 @@ public class HiveSourceBuilder {
     private int[] projectedFields;
     private Long limit;
     private List<HiveTablePartition> partitions;
+    private List<String> dynamicFilterPartitionKeys;
 
     /**
      * Creates a builder to read a hive table.
@@ -246,6 +247,9 @@ public class HiveSourceBuilder {
                 jobConf,
                 tablePath,
                 partitionKeys,
+                hiveVersion,
+                dynamicFilterPartitionKeys,
+                partitions,
                 fetcher,
                 fetcherContext);
     }
@@ -259,6 +263,12 @@ public class HiveSourceBuilder {
         return this;
     }
 
+    public HiveSourceBuilder setDynamicFilterPartitionKeys(
+            List<String> dynamicFilterPartitionKeys) {
+        this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+        return this;
+    }
+
     /** Sets the maximum number of records this source should return. */
     public HiveSourceBuilder setLimit(Long limit) {
         this.limit = limit;
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
new file mode 100644
index 00000000000..d96999c971f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumerator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
+import org.apache.flink.connectors.hive.util.HivePartitionUtils;
+import org.apache.flink.connectors.hive.util.JobConfUtils;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import 
org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter;
+import 
org.apache.flink.table.data.util.DataFormatConverters.LocalDateTimeConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicFileEnumerator} implementation for hive source. It uses 
{@link
+ * HiveSourceFileEnumerator#createInputSplits} to generate splits like 
HiveSourceFileEnumerator, but
+ * only enumerates {@link HiveTablePartition}s that exist in the {@link 
DynamicFilteringData} if a
+ * DynamicFilteringData is provided.
+ */
+public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        // Note: also modify #createRowData if the SUPPORTED_TYPES is modified.
+        Set<LogicalTypeRoot> supportedTypes = new HashSet<>();
+        supportedTypes.add(LogicalTypeRoot.TINYINT);
+        supportedTypes.add(LogicalTypeRoot.SMALLINT);
+        supportedTypes.add(LogicalTypeRoot.INTEGER);
+        supportedTypes.add(LogicalTypeRoot.BIGINT);
+        supportedTypes.add(LogicalTypeRoot.CHAR);
+        supportedTypes.add(LogicalTypeRoot.VARCHAR);
+        supportedTypes.add(LogicalTypeRoot.DATE);
+        supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+        SUPPORTED_TYPES = supportedTypes;
+    }
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HiveSourceDynamicFileEnumerator.class);
+
+    private final String table;
+    private final List<String> dynamicFilterPartitionKeys;
+    // For non-partition hive table, partitions only contains one partition 
which partitionValues is
+    // empty.
+    private final List<HiveTablePartition> allPartitions;
+    private final int threadNum;
+    private final JobConf jobConf;
+    private final String defaultPartitionName;
+    private final HiveShim hiveShim;
+    private transient List<HiveTablePartition> finalPartitions;
+
+    public HiveSourceDynamicFileEnumerator(
+            String table,
+            List<String> dynamicFilterPartitionKeys,
+            List<HiveTablePartition> allPartitions,
+            int threadNum,
+            String hiveVersion,
+            JobConf jobConf) {
+        this.table = checkNotNull(table);
+        this.dynamicFilterPartitionKeys = 
checkNotNull(dynamicFilterPartitionKeys);
+        this.allPartitions = checkNotNull(allPartitions);
+        this.threadNum = threadNum;
+        this.jobConf = checkNotNull(jobConf);
+        this.defaultPartitionName = 
JobConfUtils.getDefaultPartitionName(jobConf);
+        this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
+        this.finalPartitions = this.allPartitions;
+    }
+
+    public void setDynamicFilteringData(DynamicFilteringData data) {
+        LOG.debug("Filtering partitions of table {} based on the data: {}", 
table, data);
+        if (!data.isFiltering()) {
+            finalPartitions = allPartitions;
+            return;
+        }
+        finalPartitions = new ArrayList<>();
+        RowType rowType = data.getRowType();
+        Preconditions.checkArgument(rowType.getFieldCount() == 
dynamicFilterPartitionKeys.size());
+        for (HiveTablePartition partition : allPartitions) {
+            RowData partitionRow = createRowData(rowType, 
partition.getPartitionSpec());
+            if (data.contains(partitionRow)) {
+                finalPartitions.add(partition);
+            }
+        }
+        LOG.info(
+                "Dynamic filtering table {}, original partition number is {}, 
remaining partition number {}",
+                table,
+                allPartitions.size(),
+                finalPartitions.size());
+    }
+
+    @VisibleForTesting
+    RowData createRowData(RowType rowType, Map<String, String> partitionSpec) {
+        GenericRowData rowData = new GenericRowData(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            String value = 
partitionSpec.get(dynamicFilterPartitionKeys.get(i));
+            Object convertedValue =
+                    HivePartitionUtils.restorePartitionValueFromType(
+                            hiveShim, value, rowType.getTypeAt(i), 
defaultPartitionName);
+            // Note: also modify supported types if the switch is modified.
+            switch (rowType.getTypeAt(i).getTypeRoot()) {
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    // No further process is necessary.
+                    break;
+                case CHAR:
+                case VARCHAR:
+                    convertedValue = StringData.fromString((String) 
convertedValue);
+                    break;
+                case DATE:
+                    convertedValue =
+                            LocalDateConverter.INSTANCE.toInternal((LocalDate) 
convertedValue);
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    convertedValue =
+                            new LocalDateTimeConverter(9)
+                                    .toInternal((LocalDateTime) 
convertedValue);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported type for dynamic filtering:" + 
rowType.getTypeAt(i));
+            }
+            rowData.setField(i, convertedValue);
+        }
+        return rowData;
+    }
+
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int 
minDesiredSplits)
+            throws IOException {
+        return new ArrayList<>(
+                HiveSourceFileEnumerator.createInputSplits(
+                        minDesiredSplits, finalPartitions, threadNum, 
jobConf));
+    }
+
+    @VisibleForTesting
+    List<HiveTablePartition> getFinalPartitions() {
+        return finalPartitions;
+    }
+
+    /** A factory to create {@link HiveSourceDynamicFileEnumerator}. */
+    public static class Provider implements DynamicFileEnumerator.Provider {
+
+        private static final long serialVersionUID = 1L;
+
+        private final String table;
+        private final List<String> dynamicFilterPartitionKeys;
+        private final List<HiveTablePartition> partitions;
+        private final int threadNum;
+        private final String hiveVersion;
+        private final JobConfWrapper jobConfWrapper;
+
+        public Provider(
+                String table,
+                List<String> dynamicFilterPartitionKeys,
+                List<HiveTablePartition> partitions,
+                int threadNum,
+                String hiveVersion,
+                JobConfWrapper jobConfWrapper) {
+            this.table = checkNotNull(table);
+            this.dynamicFilterPartitionKeys = 
checkNotNull(dynamicFilterPartitionKeys);
+            this.partitions = checkNotNull(partitions);
+            this.threadNum = threadNum;
+            this.hiveVersion = checkNotNull(hiveVersion);
+            this.jobConfWrapper = checkNotNull(jobConfWrapper);
+        }
+
+        @Override
+        public DynamicFileEnumerator create() {
+            return new HiveSourceDynamicFileEnumerator(
+                    table,
+                    dynamicFilterPartitionKeys,
+                    partitions,
+                    threadNum,
+                    hiveVersion,
+                    jobConfWrapper.conf());
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index cc407efcec0..a339502b016 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableInpu
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -60,6 +61,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -72,7 +74,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -82,6 +86,8 @@ import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.P
 import static 
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
 import static 
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
 import static 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** A TableSource implementation to read data from Hive tables. */
 public class HiveTableSource
@@ -89,7 +95,8 @@ public class HiveTableSource
                 SupportsPartitionPushDown,
                 SupportsProjectionPushDown,
                 SupportsLimitPushDown,
-                SupportsStatisticReport {
+                SupportsStatisticReport,
+                SupportsDynamicFiltering {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableSource.class);
     private static final String HIVE_TRANSFORMATION = "hive";
@@ -104,6 +111,7 @@ public class HiveTableSource
     // Remaining partition specs after partition pruning is performed. Null if 
pruning is not pushed
     // down.
     @Nullable private List<Map<String, String>> remainingPartitions = null;
+    @Nullable private List<String> dynamicFilterPartitionKeys = null;
     protected int[] projectedFields;
     private Long limit = null;
 
@@ -181,6 +189,7 @@ public class HiveTableSource
                             execEnv,
                             sourceBuilder
                                     .setPartitions(hivePartitionsToRead)
+                                    
.setDynamicFilterPartitionKeys(dynamicFilterPartitionKeys)
                                     .buildWithDefaultBulkFormat())
                     .setParallelism(parallelism);
         }
@@ -247,6 +256,51 @@ public class HiveTableSource
         }
     }
 
+    @Override
+    public List<String> applyDynamicFiltering(List<String> 
candidateFilterFields) {
+        if (catalogTable.getPartitionKeys() != null
+                && catalogTable.getPartitionKeys().size() != 0) {
+            checkArgument(
+                    !candidateFilterFields.isEmpty(),
+                    "At least one field should be provided for dynamic 
filtering");
+
+            // only accept partition fields of supported types to do dynamic 
partition pruning
+            List<String> dynamicFilterPartitionKeys = new ArrayList<>();
+            for (String field : candidateFilterFields) {
+                if (catalogTable.getPartitionKeys().contains(field)
+                        && 
HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
+                                catalogTable
+                                        .getSchema()
+                                        .getFieldDataType(field)
+                                        .map(DataType::getLogicalType)
+                                        .map(LogicalType::getTypeRoot)
+                                        .orElse(null))) {
+                    dynamicFilterPartitionKeys.add(field);
+                }
+            }
+            if (dynamicFilterPartitionKeys.isEmpty()) {
+                LOG.warn(
+                        "No dynamic filter field is accepted,"
+                                + " only partition fields can use for dynamic 
filtering.");
+            }
+
+            // sort before check to ensure the lists have same elements in 
same order
+            dynamicFilterPartitionKeys.sort(String::compareTo);
+            checkState(
+                    this.dynamicFilterPartitionKeys == null
+                            || 
this.dynamicFilterPartitionKeys.equals(dynamicFilterPartitionKeys),
+                    "Dynamic filtering is applied twice but with different 
keys: %s != %s",
+                    this.dynamicFilterPartitionKeys,
+                    dynamicFilterPartitionKeys);
+
+            this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
+            return dynamicFilterPartitionKeys;
+        } else {
+            LOG.warn("No dynamic filter field is accepted since the table is 
non-partitioned.");
+            return Collections.emptyList();
+        }
+    }
+
     @Override
     public boolean supportsNestedProjection() {
         return false;
@@ -273,6 +327,7 @@ public class HiveTableSource
         source.remainingPartitions = remainingPartitions;
         source.projectedFields = projectedFields;
         source.limit = limit;
+        source.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
         return source;
     }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 0111e3e2f3d..aed6242c9f6 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.SqlDialect;
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -35,6 +37,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
@@ -1194,6 +1197,138 @@ public class HiveDialectITCase {
         }
     }
 
+    @Test
+    public void testDynamicPartitionPruning() throws Exception {
+        // src table
+        tableEnv.executeSql("create table dim (x int,y string,z int)");
+        tableEnv.executeSql("insert into dim values 
(1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+        // partitioned dest table
+        tableEnv.executeSql("create table fact (a int, b bigint, c string) 
partitioned by (p int)");
+        tableEnv.executeSql(
+                        "insert into fact partition (p=1) values 
(10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=2) values 
(20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=3) values 
(30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+                .await();
+
+        System.out.println(
+                tableEnv.explainSql(
+                        "select a, b, c, p, x, y from fact, dim where x = p 
and z = 1 order by a"));
+
+        tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
+        
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+
+        String sql = "select a, b, c, p, x, y from fact, dim where x = p and z 
= 1 order by a";
+        String sqlSwapFactDim =
+                "select a, b, c, p, x, y from dim, fact where x = p and z = 1 
order by a";
+
+        String expected =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 
102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, 
b], +I[22, 202, ccc, 2, 2, b]]";
+
+        // Check dynamic partition pruning is working
+        String plan = tableEnv.explainSql(sql);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        plan = tableEnv.explainSql(sqlSwapFactDim);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        // Validate results
+        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        // Validate results with table statistics
+        tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+                .get()
+                .alterTableStatistics(
+                        new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
+                        new CatalogTableStatistics(3, -1, -1, -1),
+                        false);
+
+        results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+        assertThat(results.toString()).isEqualTo(expected);
+    }
+
+    @Test
+    public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
+        tableEnv.executeSql("create table dim (x int,y string,z int)");
+        tableEnv.executeSql("insert into dim values 
(1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+        // partitioned dest table
+        tableEnv.executeSql("create table fact (a int, b bigint, c string) 
partitioned by (p int)");
+        tableEnv.executeSql(
+                        "insert into fact partition (p=1) values 
(10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=2) values 
(20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=3) values 
(30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+                .await();
+
+        // partitioned dest table
+        tableEnv.executeSql(
+                "create table fact2 (a int, b bigint, c string) partitioned by 
(p int)");
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=1) values 
(40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=2) values 
(50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=3) values 
(60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
+                .await();
+
+        tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
+
+        
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+
+        // two fact sources share the same dynamic filter
+        String sql =
+                "select * from ((select a, b, c, p, x, y from fact, dim where 
x = p and z = 1) "
+                        + "union all "
+                        + "(select a, b, c, p, x, y from fact2, dim where x = 
p and z = 1)) t order by a";
+        String expected =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 
102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, 
b], +I[22, 202, ccc, 2, 2, b], "
+                        + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, 
a], +I[42, 102, ccc, 1, 1, a], "
+                        + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, 
b], +I[52, 202, ccc, 2, 2, b]]";
+
+        String plan = tableEnv.explainSql(sql);
+        
assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
+
+        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        // two fact sources use different dynamic filters
+        String sql2 =
+                "select * from ((select a, b, c, p, x, y from fact, dim where 
x = p and z = 1) "
+                        + "union all "
+                        + "(select a, b, c, p, x, y from fact2, dim where x = 
p and z = 2)) t order by a";
+        String expected2 =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 
102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, 
b], +I[22, 202, ccc, 2, 2, b], "
+                        + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, 
c], +I[62, 302, ccc, 3, 3, c]]";
+
+        plan = tableEnv.explainSql(sql2);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        results = queryResult(tableEnv.sqlQuery(sql2));
+        assertThat(results.toString()).isEqualTo(expected2);
+    }
+
     private void verifyUnsupportedOperation(String ddl) {
         assertThatThrownBy(() -> tableEnv.executeSql(ddl))
                 .isInstanceOf(ValidationException.class)
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
new file mode 100644
index 00000000000..14a4288bb92
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveSourceDynamicFileEnumeratorTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import 
org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter;
+import 
org.apache.flink.table.data.util.DataFormatConverters.LocalDateTimeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for HiveSourceDynamicFileEnumerator. */
+class HiveSourceDynamicFileEnumeratorTest {
+
+    @Test
+    void testFiltering() {
+        List<String> keys = Collections.singletonList("a");
+        List<Map<String, String>> partitionSpecs =
+                Arrays.asList(
+                        Collections.singletonMap("a", "31"), 
Collections.singletonMap("a", "32"));
+        HiveSourceDynamicFileEnumerator enumerator = 
createTestEnumerator(keys, partitionSpecs);
+
+        assertThat(enumerator.getFinalPartitions()).hasSize(2);
+        assertThat(
+                        enumerator.getFinalPartitions().stream()
+                                .map(HiveTablePartition::getPartitionSpec)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(partitionSpecs.toArray(new Map[0]));
+
+        RowType rowType = RowType.of(new IntType());
+        TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+        GenericRowData filteringRow = new GenericRowData(1);
+        filteringRow.setField(0, 31);
+        DynamicFilteringData data =
+                new DynamicFilteringData(
+                        InternalTypeInfo.of(rowType),
+                        rowType,
+                        Collections.singletonList(serialize(rowTypeInfo, 
filteringRow)),
+                        true);
+        enumerator.setDynamicFilteringData(data);
+
+        assertThat(enumerator.getFinalPartitions()).hasSize(1);
+        
assertThat(enumerator.getFinalPartitions().get(0).getPartitionSpec().get("a"))
+                .isEqualTo("31");
+    }
+
+    @Test
+    void testCreateRowSupportedTypes() {
+        List<Tuple3<LogicalType, Object, String>> testTypeValues = new 
ArrayList<>();
+        testTypeValues.add(new Tuple3<>(new IntType(), 42, "42"));
+        testTypeValues.add(new Tuple3<>(new BigIntType(), 9876543210L, 
"9876543210"));
+        testTypeValues.add(new Tuple3<>(new SmallIntType(), (short) 41, "41"));
+        testTypeValues.add(new Tuple3<>(new TinyIntType(), (byte) 40, "40"));
+        testTypeValues.add(new Tuple3<>(new VarCharType(), 
StringData.fromString("1234"), "1234"));
+        testTypeValues.add(new Tuple3<>(new CharType(), 
StringData.fromString("7"), "7"));
+        testTypeValues.add(
+                new Tuple3<>(
+                        new DateType(),
+                        
LocalDateConverter.INSTANCE.toInternal(LocalDate.of(2022, 2, 22)),
+                        "2022-2-22"));
+        testTypeValues.add(
+                new Tuple3<>(
+                        new TimestampType(9),
+                        new LocalDateTimeConverter(9)
+                                .toInternal(LocalDateTime.of(2022, 2, 22, 22, 
2, 20, 20222022)),
+                        "2022-2-22 22:02:20.020222022"));
+
+        // Default partition values
+        testTypeValues.add(
+                new Tuple3<>(
+                        new VarCharType(),
+                        
StringData.fromString(HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
+                        HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+        testTypeValues.add(
+                new Tuple3<>(
+                        new IntType(), null, 
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal));
+
+        RowType rowType =
+                RowType.of(testTypeValues.stream().map(t -> 
t.f0).toArray(LogicalType[]::new));
+        List<String> keys = new ArrayList<>();
+        Map<String, String> spec = new HashMap<>();
+        for (int i = 0; i < testTypeValues.size(); i++) {
+            keys.add(String.valueOf(i));
+            spec.put(String.valueOf(i), testTypeValues.get(i).f2);
+        }
+        HiveSourceDynamicFileEnumerator enumerator =
+                createTestEnumerator(keys, Collections.emptyList());
+
+        RowData result = enumerator.createRowData(rowType, spec);
+
+        for (int i = 0; i < testTypeValues.size(); i++) {
+            LogicalType type = testTypeValues.get(i).f0;
+            Object expected = testTypeValues.get(i).f1;
+            FieldGetter getter = RowData.createFieldGetter(type, i);
+            assertThat(getter.getFieldOrNull(result))
+                    .withFailMessage(
+                            () ->
+                                    "Mismatching row type "
+                                            + type
+                                            + ", expected:"
+                                            + expected
+                                            + ", actual:"
+                                            + getter.getFieldOrNull(result))
+                    .isEqualTo(testTypeValues.get(i).f1);
+        }
+    }
+
+    private HiveSourceDynamicFileEnumerator createTestEnumerator(
+            List<String> keys, List<Map<String, String>> partitionSpecs) {
+        return new HiveSourceDynamicFileEnumerator(
+                "",
+                keys,
+                partitionSpecs.stream()
+                        .map(
+                                spec ->
+                                        new HiveTablePartition(
+                                                new StorageDescriptor(), spec, 
new Properties()))
+                        .collect(Collectors.toList()),
+                1,
+                HiveShimLoader.getHiveVersion(),
+                new JobConf());
+    }
+
+    private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            typeInfo.createSerializer(new ExecutionConfig())
+                    .serialize(row, new DataOutputViewStreamWrapper(baos));
+        } catch (IOException e) {
+            // throw as RuntimeException so the function can use in lambda
+            throw new RuntimeException(e);
+        }
+        return baos.toByteArray();
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
index 0a81bffdd4c..f0be9d200e7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.connector.source;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,6 +44,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
+@PublicEvolving
 public class DynamicFilteringData implements Serializable {
 
     private final TypeInformation<RowData> typeInfo;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index c2497cfbb36..989a20c6670 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -19,10 +19,8 @@
 package org.apache.flink.table.planner.connectors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
@@ -38,7 +36,6 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
-import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
@@ -65,7 +62,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -259,22 +255,6 @@ public final class DynamicSourceUtils {
         return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
     }
 
-    /** Returns true if the source is FLIP-27 source, else false. */
-    public static boolean isNewSource(ScanTableSource scanTableSource) {
-        ScanTableSource.ScanRuntimeProvider provider =
-                
scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-        if (provider instanceof SourceProvider) {
-            return true;
-        } else if (provider instanceof TransformationScanProvider) {
-            Transformation<?> transformation =
-                    ((TransformationScanProvider) provider)
-                            .createTransformation(name -> Optional.empty());
-            return transformation instanceof SourceTransformation;
-        }
-        // TODO supports more
-        return false;
-    }
-
     // 
--------------------------------------------------------------------------------------------
 
     /** Creates a specialized node for assigning watermarks. */
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
index 79317c1f826..601f0298179 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java
@@ -18,14 +18,18 @@
 
 package org.apache.flink.table.planner.plan.rules.physical.batch;
 
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.connectors.TransformationScanProvider;
 import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCalc;
@@ -37,6 +41,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalRel
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalTableSourceScan;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
@@ -65,6 +70,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED;
@@ -177,7 +183,7 @@ public abstract class DynamicPartitionPruningRule extends 
RelRule<RelRule.Config
             return false;
         }
 
-        if (!DynamicSourceUtils.isNewSource((ScanTableSource) tableSource)) {
+        if (!isNewSource((ScanTableSource) tableSource)) {
             return false;
         }
 
@@ -189,6 +195,31 @@ public abstract class DynamicPartitionPruningRule extends 
RelRule<RelRule.Config
         return !acceptedFieldIndices.isEmpty();
     }
 
+    /** Returns true if the source is FLIP-27 source, else false. */
+    private static boolean isNewSource(ScanTableSource scanTableSource) {
+        ScanTableSource.ScanRuntimeProvider provider =
+                
scanTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        if (provider instanceof SourceProvider) {
+            return true;
+        } else if (provider instanceof TransformationScanProvider) {
+            Transformation<?> transformation =
+                    ((TransformationScanProvider) provider)
+                            .createTransformation(name -> Optional.empty());
+            return transformation instanceof SourceTransformation;
+        } else if (provider instanceof DataStreamScanProvider) {
+            // Suppose DataStreamScanProvider of sources that support dynamic 
filtering will use new
+            // Source. It's not reliable and should be checked.
+            // TODO FLINK-28864 check if the source used by the 
DataStreamScanProvider is actually a
+            //  new source.
+            // This situation will not generate wrong result because it's 
handled when translating
+            // BatchTableSourceScan. The only effect is the physical plan and 
the exec node plan
+            // have DPP nodes, but they do not work in runtime.
+            return true;
+        }
+        // TODO supports more
+        return false;
+    }
+
     private static List<Integer> getAcceptedFieldIndices(
             List<Integer> factJoinKeys,
             @Nullable BatchPhysicalCalc factCalc,

Reply via email to