xushiyan commented on code in PR #6725:
URL: https://github.com/apache/hudi/pull/6725#discussion_r996889670
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java:
##########
@@ -64,6 +64,12 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC =
HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
public static final ConfigProperty<String> HIVE_SYNC_COMMENT =
HiveSyncConfigHolder.HIVE_SYNC_COMMENT;
+ public static final ConfigProperty<Integer>
META_SYNC_FILTER_PUSHDOWN_MAX_SIZE = ConfigProperty
Review Comment:
better call it `HIVE_SYNC_` as it's specific for hive sync (residing in
HiveSyncConfig)
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
Review Comment:
can be a static helper in some other utils
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+ List<List<String>> partitionVals = writtenPartitionsSince
+ .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
Review Comment:
instead of nested lists, can we make use of
org.apache.hudi.sync.common.model.Partition model?
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
Review Comment:
let's align the name, should be called partitionFields, applies to other
involved API arg names too
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
Review Comment:
this and other new methods added below should be placed in a better class.
they are very specific to filter generation and don't belong to the sync tool
level. there should be specific helper used by hive sync client to encapsulate
all these logic.
##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java:
##########
@@ -1168,6 +1168,74 @@ public void testSyncWithoutDiffs(String syncMode) throws
Exception {
assertEquals(commitTime1,
hiveClient.getLastCommitTimeSynced(tableName).get());
}
+ @Test
+ public void testSyncWithPushDownFilters() {
Review Comment:
high-level feedback about testing: we need many more variation and corner
cases covered in UT before landing this feature: different partition val
extractors, hive style or not, multi/single/non-partition fields, etc.
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+ List<List<String>> partitionVals = writtenPartitionsSince
+ .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+ .filter(values -> !values.isEmpty())
+ .collect(Collectors.toList());
+
+ if (partitionVals.isEmpty()) {
+ // No partition is written
+ return Collections.emptyList();
+ }
+
+ int estimateSize = partitionKeys.size() * partitionVals.size();
+ if (estimateSize >
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
+ return syncClient.getAllPartitions(tableName);
Review Comment:
here we changed the default behavior for many users where filter size <
1000. We have to ensure the correctness of filter generation by having enough
UT coverage. So I'd suggest keep the existing sync all partitions still as
default and use filter only when people set some threshold, e.g., default = -1
(not using filter). then people set accordingly to their tables' partition
numbers
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
Review Comment:
quite some data wrangling here.. can we make use of and evolve models from
sync-common to encapsulate them? the code will not be easy to maintain if these
logic spread around in methods
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean
tableExists, boolean useRea
return schemaChanged;
}
+ /**
+ * Combine filter with existing filters.
+ *
+ * 1. If no filter left in the filterBuilder, will simply add it.
+ * 2. If filterBuilder is not empty, will combine the existing filters
+ * and new adding filter with operator if the filter is not empty,
+ * otherwise return empty for OR operator; while return left side
+ * for ADD operator. Wrap these filters with parenthesis.
+ */
+ private void combineFilter(StringBuilder filterBuilder, String operator,
Option<String> filter) {
+ ValidationUtils.checkArgument(operator.equals("OR") ||
operator.equals("AND"),
+ "Filters can only accept `OR` or `AND` operator");
+ boolean shouldEnclosed = false;
+ if (filter.isPresent() && !filter.get().isEmpty()) {
+ if (filterBuilder.length() != 0) {
+ // If filterBuilder is not empty, which means there already has some
filters
+ // in the filterBuilder, we need to use operator to combine already
existed filters
+ // and the new filter.
+ filterBuilder.insert(0, "(");
+ filterBuilder.append(" " + operator + " ");
+ shouldEnclosed = true;
+ }
+ filterBuilder.append(filter.get());
+ } else {
+ if (operator.equals("OR")) {
+ filterBuilder.setLength(0);
+ return;
+ }
+ }
+
+ if (shouldEnclosed) {
+ filterBuilder.append(")");
+ }
+ }
+
+ private String quoteStringLiteral(String value) {
+ if (!value.contains("\"")) {
+ return "\"" + value + "\"";
+ } else if (!value.contains("'")) {
+ return "'" + value + "'";
+ } else {
+ throw new UnsupportedOperationException("Cannot pushdown filters if \"
and ' both exist");
+ }
+ }
+
+ private Option<String> extractLiteralValue(String type, String value) {
+ switch (type.toLowerCase(Locale.ROOT)) {
+ case HiveSchemaUtil.STRING_TYPE_NAME:
+ return Option.of(quoteStringLiteral(value));
+ case HiveSchemaUtil.INT_TYPE_NAME:
+ case HiveSchemaUtil.BIGINT_TYPE_NAME:
+ case HiveSchemaUtil.DATE_TYPE_NAME:
+ return Option.of(value);
+ default:
+ return Option.empty();
+ }
+ }
+
+ private Option<String> generateEqualFilter(String key, String type, String
value) {
+ Option<String> extracted = extractLiteralValue(type, value);
+ if (extracted.isPresent()) {
+ return Option.of(key + " = " + extracted.get());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Visit the partition and generate filters.
+ *
+ * Examples:
+ * 1. date=2022-09-20 => date = 2022-09-20
+ * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+ * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND
min = 30)
+ */
+ private String visitPartition(List<String> partitionKeys,
+ Map<String, String> keyWithTypes,
+ List<String> partitionValues) {
+ if (partitionKeys.size() != partitionValues.size()) {
+ throw new HoodieHiveSyncException("Partition key and values should be
same length"
+ + ", but got partitionKey: " + partitionKeys + " with values: " +
partitionValues);
+ }
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionValues.size(); i++) {
+ String key = partitionKeys.get(i);
+ combineFilter(filterBuilder, "AND",
+ generateEqualFilter(key, keyWithTypes.get(key),
partitionValues.get(i)));
+ }
+
+ return filterBuilder.toString();
+ }
+
+ protected String generateWrittenPartitionsFilter(String tableName,
+ List<String> partitionKeys,
+ List<List<String>>
partitionVals) {
+ // Hive store columns to lowercase, so we need to map partitions to
lowercase to avoid any mismatch.
+ List<String> normalizedPartitionKeys = partitionKeys.stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<String> partitionTypes =
syncClient.getMetastoreFieldSchemas(tableName)
+ .stream()
+ .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+ .map(FieldSchema::getType)
+ .collect(Collectors.toList());
+
+ if (partitionTypes.size() == 0) {
+ throw new HoodieHiveSyncException("Cannot get partition types from
SyncClient, maybe "
+ + "table schema is not synced");
+ }
+
+ Map<String, String> keyWithTypes =
CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+ StringBuilder filterBuilder = new StringBuilder();
+ for (int i = 0; i < partitionVals.size(); i++) {
+ combineFilter(filterBuilder, "OR",
+ Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes,
partitionVals.get(i))));
+ }
+ return filterBuilder.toString();
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching
+ * too many unnecessary partitions.
+ */
+ private List<Partition> getTablePartitions(String tableName, List<String>
writtenPartitionsSince) {
+ PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+
.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+ List<String> partitionKeys =
config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+ List<List<String>> partitionVals = writtenPartitionsSince
+ .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+ .filter(values -> !values.isEmpty())
+ .collect(Collectors.toList());
+
+ if (partitionVals.isEmpty()) {
+ // No partition is written
+ return Collections.emptyList();
+ }
+
+ int estimateSize = partitionKeys.size() * partitionVals.size();
+ if (estimateSize >
config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
Review Comment:
any empirical evidence to share and show that usually more than 1000 filters
will be less performant than fetch all partitions? how should we guide users on
setting this config?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]