boneanxs commented on code in PR #6725:
URL: https://github.com/apache/hudi/pull/6725#discussion_r1001578617


##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean 
tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, 
Option<String> filter) {

Review Comment:
   Make sense, will change this these days.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean 
tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, 
Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || 
operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some 
filters
+        // in the filterBuilder, we need to use operator to combine already 
existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" 
and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String 
value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND 
min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be 
same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + 
partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), 
partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> 
partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to 
lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = 
syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from 
SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = 
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, 
partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to 
avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> 
writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > 
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {

Review Comment:
   If there are too many filters pushing down to the backend hive metastore 
server, it could cause HMS throw stackoverflow error, so we need to limit 
filters here. It depends on the HMS setting for the stack size limit(-Xss).
   
   Here I follow spark community default value 
for`spark.sql.hive.metastorePartitionPruningInSetThreshold`, which is 1000



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean 
tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, 
Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || 
operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some 
filters
+        // in the filterBuilder, we need to use operator to combine already 
existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" 
and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String 
value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND 
min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be 
same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + 
partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), 
partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> 
partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to 
lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = 
syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from 
SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = 
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, 
partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to 
avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> 
writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > 
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
+      return syncClient.getAllPartitions(tableName);

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to