vinothchandar commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r781737678
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
+ @Override
+ public Stream<Pair<String, BloomIndexFileInfo>> apply(String
partitionName) throws Exception {
+ final String columnIndexID = new
ColumnIndexID(keyField).asBase64EncodedString();
Review comment:
Need a deeper look, but lets use `id` in naming only when is actually
the id and not a field name
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
Review comment:
fix doc to talk about metadata table specific details
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
Review comment:
lets write a lambda here, instead of an anonymous class?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I,
K, O> extends HoodieReadHandle<T,
Review comment:
Can we share more code with the existing LookupHandle
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I,
K, O> extends HoodieReadHandle<T,
+ I, K, O> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+ private final HoodieTableType tableType;
+ private final BloomFilter bloomFilter;
+ private final List<String> candidateRecordKeys;
+ private long totalKeysChecked;
+
+ public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config,
HoodieTable<T, I, K, O> hoodieTable,
+ Pair<String, String>
partitionPathFileIdPair, String fileName) {
+ super(config, null, hoodieTable, partitionPathFileIdPair);
+ this.tableType = hoodieTable.getMetaClient().getTableType();
+ this.candidateRecordKeys = new ArrayList<>();
+ this.totalKeysChecked = 0;
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<ByteBuffer> bloomFilterByteBuffer =
+ hoodieTable.getMetadataTable().getBloomFilter(new
PartitionIndexID(partitionPathFileIdPair.getLeft()),
+ new FileIndexID(fileName));
+ if (!bloomFilterByteBuffer.isPresent()) {
+ throw new HoodieIndexException("BloomFilter missing for " + fileName);
+ }
+
+ this.bloomFilter =
+ new
HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
+ BloomFilterTypeCode.DYNAMIC_V0);
+ LOG.debug(String.format("Read bloom filter from %s,%s, size: %s in %d ms",
partitionPathFileIdPair, fileName,
+ bloomFilterByteBuffer.get().array().length, timer.endTimer()));
+ }
+
+ /**
+ * Given a list of row keys and one file, return only row keys existing in
that file.
+ */
+ public List<String> checkCandidatesAgainstFile(Configuration configuration,
List<String> candidateRecordKeys,
Review comment:
Can you add comment on the PR to indicate how much of this file is new
vs copied over
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
+ @Override
+ public Stream<Pair<String, BloomIndexFileInfo>> apply(String
partitionName) throws Exception {
Review comment:
we discussed reading all of this from the driver correct? like fetch the
entire list of stats for a key column alone?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +170,75 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
+ @Override
+ public Stream<Pair<String, BloomIndexFileInfo>> apply(String
partitionName) throws Exception {
+ final String columnIndexID = new
ColumnIndexID(keyField).asBase64EncodedString();
+ final String partitionIndexID = new
PartitionIndexID(partitionName).asBase64EncodedString();
+
+ List<Pair<String, String>> partitionFileIdList =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+ hoodieTable).stream().map(baseFile ->
Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+ .collect(toList());
+ try {
+ Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+ List<String> columnStatKeys = new ArrayList<>();
+ for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+ final String columnStatIndexKey = columnIndexID
+ .concat(partitionIndexID)
+ .concat(new
FileIndexID(fileIdFileName.getRight()).asBase64EncodedString());
+ columnStatKeys.add(columnStatIndexKey);
+ columnStatKeyToFileIdMap.put(columnStatIndexKey,
fileIdFileName.getLeft());
+ }
+ if (columnStatKeys.isEmpty()) {
+ return Stream.empty();
+ }
+
+ Collections.sort(columnStatKeys);
+ Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+ .getMetadataTable().getColumnStats(columnStatKeys);
Review comment:
This code is doing a lot of munging, object conversion. Lets move this
down into `getColumnStats()`. Callers of `getXXX()` methods of metadata table
should not be aware of the id, sorting all these implementation details.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log
records. If disabled, hudi does look up of only interested entries.");
+ public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER =
ConfigProperty
Review comment:
you are assuming its always the key field that the bloom filter points
to. We need to also take another config where user can specify list of
columns/fields to track bloom filters for
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -159,6 +173,71 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
}
}
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ if (config.getBloomIndexPruneByRanges()) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, new SerializableFunction<String,
Stream<Pair<String, BloomIndexFileInfo>>>() {
+ @Override
+ public Stream<Pair<String, BloomIndexFileInfo>> apply(String
partitionName) throws Exception {
+ final String columnIndexID = new
ColumnIndexID(keyField).asBase64EncodedString();
+ final String partitionIndexID = new
PartitionIndexID(partitionName).asBase64EncodedString();
+
+ List<Pair<String, String>> partitionFileIdList =
+ HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+ hoodieTable).stream().map(baseFile ->
Pair.of(baseFile.getFileId(), baseFile.getFileName()))
+ .collect(toList());
+ try {
+ Map<String, String> columnStatKeyToFileIdMap = new HashMap<>();
+ List<String> columnStatKeys = new ArrayList<>();
+ for (Pair<String, String> fileIdFileName : partitionFileIdList) {
+ final String columnStatIndexKey = columnIndexID
+ .concat(partitionIndexID)
+ .concat(new
FileIndexID(fileIdFileName.getLeft()).asBase64EncodedString());
+ columnStatKeys.add(columnStatIndexKey);
+ columnStatKeyToFileIdMap.put(columnStatIndexKey,
fileIdFileName.getLeft());
+ }
+ Collections.sort(columnStatKeys);
+
+ Map<String, HoodieColumnStats> columnKeyHashToStatMap = hoodieTable
+ .getMetadataTable().getColumnStats(columnStatKeys);
+ List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
+ for (Map.Entry<String, HoodieColumnStats> entry :
columnKeyHashToStatMap.entrySet()) {
+ result.add(Pair.of(partitionName,
+ new BloomIndexFileInfo(
+ columnStatKeyToFileIdMap.get(entry.getKey()),
+ entry.getValue().getMinValue(),
+ entry.getValue().getMaxValue()
+ )));
+ }
+ return result.stream();
+ } catch (MetadataNotFoundException me) {
+ throw new HoodieMetadataException("Unable to find column range
metadata for partition:" + partitionName, me);
+ }
+ }
+ }, Math.max(partitions.size(), 1));
+ } else {
+ // Obtain the latest data files from all the partitions.
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
Review comment:
+1
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I,
K, O> extends HoodieReadHandle<T,
+ I, K, O> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+ private final HoodieTableType tableType;
+ private final BloomFilter bloomFilter;
+ private final List<String> candidateRecordKeys;
+ private long totalKeysChecked;
+
+ public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config,
HoodieTable<T, I, K, O> hoodieTable,
+ Pair<String, String>
partitionPathFileIdPair, String fileId) {
+ super(config, null, hoodieTable, partitionPathFileIdPair);
+ this.tableType = hoodieTable.getMetaClient().getTableType();
+ this.candidateRecordKeys = new ArrayList<>();
+ this.totalKeysChecked = 0;
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<ByteBuffer> bloomFilterByteBuffer =
Review comment:
+1. its duplicating a lot at this point
##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
"doc": "Type of the metadata record",
"type": "int"
},
- { "name": "filesystemMetadata",
+ {
"doc": "Contains information about partitions and files within the
dataset",
- "type": ["null", {
- "type": "map",
- "values": {
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of bloom filters for all data files in the
user table",
+ "name": "BloomFilterMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file bloom filter details",
+ "name": "HoodieMetadataBloomFilter",
"type": "record",
- "name": "HoodieMetadataFileInfo",
"fields": [
{
- "name": "size",
- "type": "long",
- "doc": "Size of the file"
+ "doc": "Bloom filter type code",
+ "name": "type",
+ "type": "string"
+ },
+ {
+ "doc": "Instant timestamp when this metadata was
created/updated",
+ "name": "timestamp",
+ "type": "string"
+ },
+ {
+ "doc": "Bloom filter binary byte array",
+ "name": "bloomFilter",
+ "type": "bytes"
},
{
+ "doc": "Bloom filter entry valid/deleted flag",
"name": "isDeleted",
- "type": "boolean",
- "doc": "True if this file has been deleted"
+ "type": "boolean"
+ },
+ {
+ "doc": "Reserved bytes for future use",
+ "name": "reserved",
Review comment:
we can evolve the schema right?
##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
"doc": "Type of the metadata record",
"type": "int"
},
- { "name": "filesystemMetadata",
+ {
"doc": "Contains information about partitions and files within the
dataset",
- "type": ["null", {
- "type": "map",
- "values": {
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of bloom filters for all data files in the
user table",
+ "name": "BloomFilterMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file bloom filter details",
+ "name": "HoodieMetadataBloomFilter",
"type": "record",
- "name": "HoodieMetadataFileInfo",
"fields": [
{
- "name": "size",
- "type": "long",
- "doc": "Size of the file"
+ "doc": "Bloom filter type code",
+ "name": "type",
+ "type": "string"
+ },
+ {
+ "doc": "Instant timestamp when this metadata was
created/updated",
+ "name": "timestamp",
+ "type": "string"
+ },
+ {
+ "doc": "Bloom filter binary byte array",
+ "name": "bloomFilter",
+ "type": "bytes"
},
{
+ "doc": "Bloom filter entry valid/deleted flag",
"name": "isDeleted",
- "type": "boolean",
- "doc": "True if this file has been deleted"
+ "type": "boolean"
+ },
+ {
+ "doc": "Reserved bytes for future use",
+ "name": "reserved",
+ "type": "bytes"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of column ranges for all data files in the
user table",
+ "name": "ColumnStatsMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file column ranges details",
+ "name": "HoodieColumnStats",
+ "type": "record",
+ "fields": [
+ {
+ "doc": "Minimum value in the range. Based on user
data table schema, we can convert this to appropriate type",
+ "name": "minValue",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "doc": "Maximum value in the range. Based on user
data table schema, we can convert it to appropriate type",
+ "name": "maxValue",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "doc": "Maximum value in the range. Based on user
data table schema, we can convert it to appropriate type",
Review comment:
yes lets get them all added in one shot
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupHandle.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyMetaIndexLookupHandle<T extends HoodieRecordPayload, I,
K, O> extends HoodieReadHandle<T,
+ I, K, O> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieKeyMetaIndexLookupHandle.class);
+ private final HoodieTableType tableType;
+ private final BloomFilter bloomFilter;
+ private final List<String> candidateRecordKeys;
+ private long totalKeysChecked;
+
+ public HoodieKeyMetaIndexLookupHandle(HoodieWriteConfig config,
HoodieTable<T, I, K, O> hoodieTable,
+ Pair<String, String>
partitionPathFileIdPair, String fileName) {
+ super(config, null, hoodieTable, partitionPathFileIdPair);
+ this.tableType = hoodieTable.getMetaClient().getTableType();
+ this.candidateRecordKeys = new ArrayList<>();
+ this.totalKeysChecked = 0;
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Option<ByteBuffer> bloomFilterByteBuffer =
+ hoodieTable.getMetadataTable().getBloomFilter(new
PartitionIndexID(partitionPathFileIdPair.getLeft()),
Review comment:
again, up level these APIs
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyMetaIndexLookupResult.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import java.util.List;
+
+/**
+ * Encapsulates the result from a key lookup.
+ */
+public class HoodieKeyMetaIndexLookupResult {
Review comment:
again can we reuse code
##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,118 @@
"doc": "Type of the metadata record",
"type": "int"
},
- { "name": "filesystemMetadata",
+ {
"doc": "Contains information about partitions and files within the
dataset",
- "type": ["null", {
- "type": "map",
- "values": {
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of bloom filters for all data files in the
user table",
+ "name": "BloomFilterMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file bloom filter details",
+ "name": "HoodieMetadataBloomFilter",
"type": "record",
- "name": "HoodieMetadataFileInfo",
"fields": [
{
- "name": "size",
- "type": "long",
- "doc": "Size of the file"
+ "doc": "Bloom filter type code",
+ "name": "type",
+ "type": "string"
+ },
+ {
+ "doc": "Instant timestamp when this metadata was
created/updated",
+ "name": "timestamp",
+ "type": "string"
+ },
+ {
+ "doc": "Bloom filter binary byte array",
+ "name": "bloomFilter",
+ "type": "bytes"
},
{
+ "doc": "Bloom filter entry valid/deleted flag",
"name": "isDeleted",
- "type": "boolean",
- "doc": "True if this file has been deleted"
+ "type": "boolean"
+ },
+ {
+ "doc": "Reserved bytes for future use",
+ "name": "reserved",
+ "type": "bytes"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of column ranges for all data files in the
user table",
+ "name": "ColumnStatsMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file column ranges details",
+ "name": "HoodieColumnStats",
+ "type": "record",
+ "fields": [
+ {
+ "doc": "Minimum value in the range. Based on user
data table schema, we can convert this to appropriate type",
+ "name": "minValue",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "doc": "Maximum value in the range. Based on user
data table schema, we can convert it to appropriate type",
+ "name": "maxValue",
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ {
+ "doc": "Maximum value in the range. Based on user
data table schema, we can convert it to appropriate type",
+ "name": "nullCount",
+ "type": [
+ "null",
+ "long"
+ ]
+ },
+ {
+ "doc": "Column range entry valid/deleted flag",
+ "name": "isDeleted",
+ "type": "boolean"
+ },
+ {
+ "doc": "Reserved bits for future use",
Review comment:
again why do we need this?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -101,10 +102,11 @@ protected void initRegistry() {
}
@Override
- protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String
partitionName, String instantTime, boolean canTriggerTableService) {
+ protected void commit(String instantTime, Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap,
Review comment:
We should see if we can reduce the code duplication here between Flink
and Spark writers.. now that we have the `HoodieData` abstraction. land in it a
separate PR.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -109,6 +109,7 @@
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
+ protected final List<MetadataPartitionType> enabledPartitionTypes;
Review comment:
We should track `enabledPartitions` not just partition types. i.e this
layer should have the ability for writers to create even multiple bloom filter
partitions, to track say a secondary key's bloom filters as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]