boneanxs commented on code in PR #6725:
URL: https://github.com/apache/hudi/pull/6725#discussion_r1001578617
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
Review Comment:
Make sense, will change this these days.
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+ List<List<String>> partitionVals = writtenPartitionsSince
+ .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+ .filter(values -> !values.isEmpty())
+ .collect(Collectors.toList());
+
+ if (partitionVals.isEmpty()) {
+ // No partition is written
+ return Collections.emptyList();
+ }
+
+ int estimateSize = partitionKeys.size() * partitionVals.size();
+ if (estimateSize >
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
Review Comment:
If there are too many filters pushing down to the backend hive metastore
server, it could cause HMS throw stackoverflow error, so we need to limit
filters here. It depends on the HMS setting for the stack size limit(-Xss).
Here I follow spark community default value
for`spark.sql.hive.metastorePartitionPruningInSetThreshold`, which is 1000
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+ List<List<String>> partitionVals = writtenPartitionsSince
+ .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+ .filter(values -> !values.isEmpty())
+ .collect(Collectors.toList());
+
+ if (partitionVals.isEmpty()) {
+ // No partition is written
+ return Collections.emptyList();
+ }
+
+ int estimateSize = partitionKeys.size() * partitionVals.size();
+ if (estimateSize >
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
+ return syncClient.getAllPartitions(tableName);
Review Comment:
sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]