godfreyhe commented on code in PR #20596:
URL: https://github.com/apache/flink/pull/20596#discussion_r950007764


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -252,48 +250,41 @@ public void applyPartitions(List<Map<String, String>> 
remainingPartitions) {
     }
 
     @Override
-    public List<String> applyDynamicFiltering(List<String> 
candidateFilterFields) {
-        if (catalogTable.getPartitionKeys() != null
-                && catalogTable.getPartitionKeys().size() != 0) {
-            checkArgument(
-                    !candidateFilterFields.isEmpty(),
-                    "At least one field should be provided for dynamic 
filtering");
-
-            // only accept partition fields of supported types to do dynamic 
partition pruning
-            List<String> dynamicFilterPartitionKeys = new ArrayList<>();
-            for (String field : candidateFilterFields) {
-                if (catalogTable.getPartitionKeys().contains(field)
-                        && 
HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
-                                catalogTable
-                                        .getSchema()
-                                        .getFieldDataType(field)
-                                        .map(DataType::getLogicalType)
-                                        .map(LogicalType::getTypeRoot)
-                                        .orElse(null))) {
-                    dynamicFilterPartitionKeys.add(field);
-                }
-            }
-            if (dynamicFilterPartitionKeys.isEmpty()) {
-                LOG.warn(
-                        "No dynamic filter field is accepted,"
-                                + " only partition fields can use for dynamic 
filtering.");
-            }
+    public List<String> listAcceptedFilterFields() {
+        return catalogTable.getPartitionKeys();
+    }
 
-            // sort before check to ensure the lists have same elements in 
same order
-            dynamicFilterPartitionKeys.sort(String::compareTo);
-            checkState(
-                    this.dynamicFilterPartitionKeys == null
-                            || 
this.dynamicFilterPartitionKeys.equals(dynamicFilterPartitionKeys),
-                    "Dynamic filtering is applied twice but with different 
keys: %s != %s",
-                    this.dynamicFilterPartitionKeys,
-                    dynamicFilterPartitionKeys);
-
-            this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
-            return dynamicFilterPartitionKeys;
-        } else {
-            LOG.warn("No dynamic filter field is accepted since the table is 
non-partitioned.");
-            return Collections.emptyList();
+    @Override
+    public void applyDynamicFiltering(List<String> candidateFilterFields) {
+        // only accept partition fields of supported types to do dynamic 
partition pruning
+        List<String> dynamicFilterPartitionKeys = new ArrayList<>();
+        for (String field : candidateFilterFields) {
+            if (HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
+                    catalogTable
+                            .getSchema()
+                            .getFieldDataType(field)
+                            .map(DataType::getLogicalType)
+                            .map(LogicalType::getTypeRoot)
+                            .orElse(null))) {
+                dynamicFilterPartitionKeys.add(field);
+            }
         }

Review Comment:
   move them into `listAcceptedFilterFields` method



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -60,9 +60,16 @@
 public interface SupportsDynamicFiltering {
 
     /**
-     * Applies the candidate filter fields into the table source, and return 
the accepted fields.
-     * The data corresponding the filter fields will be provided in runtime, 
which can be used to
-     * filter the partitions or the input data.
+     * Return the filter fields this partition table source supported. This 
method is used in
+     * planner, which can be used to judge whether the source table is a 
suitable partition table
+     * for filtering partitions.
      */
-    List<String> applyDynamicFiltering(List<String> candidateFilterFields);
+    List<String> listAcceptedFilterFields();
+
+    /**
+     * Applies the candidate filter fields into the table source. The data 
corresponding the filter
+     * fields will be provided in runtime, which can be used to filter the 
partitions or the input
+     * data.
+     */

Review Comment:
      /**
        * Applies the candidate filter fields into the table source. The data 
corresponding the filter
        * fields will be provided in runtime, which can be used to filter the 
partitions or the input
        * data.
        *
        * `<p>`NOTE: the candidate filter fields are always from the result of 
{@link
        * #listAcceptedFilterFields()}.
        */



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsDynamicFiltering.java:
##########
@@ -60,9 +60,16 @@
 public interface SupportsDynamicFiltering {
 
     /**
-     * Applies the candidate filter fields into the table source, and return 
the accepted fields.
-     * The data corresponding the filter fields will be provided in runtime, 
which can be used to
-     * filter the partitions or the input data.
+     * Return the filter fields this partition table source supported. This 
method is used in
+     * planner, which can be used to judge whether the source table is a 
suitable partition table
+     * for filtering partitions.

Review Comment:
     /**
        * Returns the filter fields this partition table source supported. This 
method can tell the
        * planner which fields can be used as dynamic filtering fields, the 
planner will pick some
        * fields from the returned fields based on the query, and create 
dynamic filtering operator.
        */



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -252,48 +250,41 @@ public void applyPartitions(List<Map<String, String>> 
remainingPartitions) {
     }
 
     @Override
-    public List<String> applyDynamicFiltering(List<String> 
candidateFilterFields) {
-        if (catalogTable.getPartitionKeys() != null
-                && catalogTable.getPartitionKeys().size() != 0) {
-            checkArgument(
-                    !candidateFilterFields.isEmpty(),
-                    "At least one field should be provided for dynamic 
filtering");
-
-            // only accept partition fields of supported types to do dynamic 
partition pruning
-            List<String> dynamicFilterPartitionKeys = new ArrayList<>();
-            for (String field : candidateFilterFields) {
-                if (catalogTable.getPartitionKeys().contains(field)
-                        && 
HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
-                                catalogTable
-                                        .getSchema()
-                                        .getFieldDataType(field)
-                                        .map(DataType::getLogicalType)
-                                        .map(LogicalType::getTypeRoot)
-                                        .orElse(null))) {
-                    dynamicFilterPartitionKeys.add(field);
-                }
-            }
-            if (dynamicFilterPartitionKeys.isEmpty()) {
-                LOG.warn(
-                        "No dynamic filter field is accepted,"
-                                + " only partition fields can use for dynamic 
filtering.");
-            }
+    public List<String> listAcceptedFilterFields() {
+        return catalogTable.getPartitionKeys();
+    }
 
-            // sort before check to ensure the lists have same elements in 
same order
-            dynamicFilterPartitionKeys.sort(String::compareTo);
-            checkState(
-                    this.dynamicFilterPartitionKeys == null
-                            || 
this.dynamicFilterPartitionKeys.equals(dynamicFilterPartitionKeys),
-                    "Dynamic filtering is applied twice but with different 
keys: %s != %s",
-                    this.dynamicFilterPartitionKeys,
-                    dynamicFilterPartitionKeys);
-
-            this.dynamicFilterPartitionKeys = dynamicFilterPartitionKeys;
-            return dynamicFilterPartitionKeys;
-        } else {

Review Comment:
   if it's not a partition table, we can throw exception directly



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -171,7 +171,29 @@ private static void visitFactSide(
                     joinKeys.stream()
                             .map(i -> scan.getRowType().getFieldNames().get(i))
                             .collect(Collectors.toList());
-            factSideFactors.isSuitableFactScanSource = 
!candidateFields.isEmpty();
+            if (candidateFields.isEmpty()) {
+                factSideFactors.isSuitableFactScanSource = false;
+                return;
+            }
+
+            List<String> acceptedFilterFields =
+                    ((SupportsDynamicFiltering) 
tableSource).listAcceptedFilterFields();
+            if (acceptedFilterFields == null) {

Review Comment:
   acceptedFilterFields == null || acceptedFilterFields.isEmpty()



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -252,48 +250,41 @@ public void applyPartitions(List<Map<String, String>> 
remainingPartitions) {
     }
 
     @Override
-    public List<String> applyDynamicFiltering(List<String> 
candidateFilterFields) {
-        if (catalogTable.getPartitionKeys() != null
-                && catalogTable.getPartitionKeys().size() != 0) {
-            checkArgument(
-                    !candidateFilterFields.isEmpty(),
-                    "At least one field should be provided for dynamic 
filtering");
-
-            // only accept partition fields of supported types to do dynamic 
partition pruning
-            List<String> dynamicFilterPartitionKeys = new ArrayList<>();
-            for (String field : candidateFilterFields) {
-                if (catalogTable.getPartitionKeys().contains(field)
-                        && 
HiveSourceDynamicFileEnumerator.SUPPORTED_TYPES.contains(
-                                catalogTable
-                                        .getSchema()
-                                        .getFieldDataType(field)
-                                        .map(DataType::getLogicalType)
-                                        .map(LogicalType::getTypeRoot)
-                                        .orElse(null))) {
-                    dynamicFilterPartitionKeys.add(field);
-                }
-            }
-            if (dynamicFilterPartitionKeys.isEmpty()) {
-                LOG.warn(
-                        "No dynamic filter field is accepted,"
-                                + " only partition fields can use for dynamic 
filtering.");
-            }
+    public List<String> listAcceptedFilterFields() {
+        return catalogTable.getPartitionKeys();

Review Comment:
   only return the partition keys with the supported types



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/DynamicPartitionPruningRule.java:
##########
@@ -138,30 +137,26 @@ private static List<Integer> getAcceptedFieldIndices(
                             .map(i -> 
factScan.getRowType().getFieldNames().get(i))
                             .collect(Collectors.toList());
         }
+        List<String> acceptedFilterFieldsInSource =
+                ((SupportsDynamicFiltering) 
tableSource).listAcceptedFilterFields();
 
-        List<String> acceptedFields =
-                ((SupportsDynamicFiltering) 
tableSource).applyDynamicFiltering(candidateFields);
-
-        if (acceptedFields == null) {
-            return new ArrayList<>();
-        }
-
-        for (String field : acceptedFields) {
-            if (!candidateFields.contains(field)) {
-                throw new TableException(
-                        String.format(
-                                "Field: %s does not exist in the given fields: 
%s, "
-                                        + "please verify the 
applyDynamicFiltering method in connector: %s",
-                                field, candidateFields, 
tableSource.asSummaryString()));
+        List<String> acceptedFilterFields = new ArrayList<>();
+        for (String candidateField : candidateFields) {
+            if (acceptedFilterFieldsInSource.contains(candidateField)) {
+                acceptedFilterFields.add(candidateField);
             }
         }

Review Comment:
   extract a common method which can be reused by DynamicPartitionPruningRule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to