xushiyan commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1228989080
##########
hudi-common/src/main/avro/HoodieMetadata.avsc:
##########
@@ -358,6 +358,46 @@
}
],
"default" : null
+ },
+ {
+ "name": "recordIndexMetadata",
+ "doc": "Metadata Index that contains information about record keys
and their location in the dataset",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "HoodieRecordIndexInfo",
+ "fields": [
+ {
+ "name": "partition",
+ "type": "string",
+ "doc": "Partition which contains the record",
+ "avro.java.string": "String"
+ },
+ {
+ "name": "fileIdHighBits",
+ "type": "long",
+ "doc": "fileId which contains the record (high 64
bits)"
+ },
+ {
+ "name": "fileIdLowBits",
+ "type": "long",
+ "doc": "fileId which contains the record (low 64
bits)"
+ },
+ {
+ "name": "fileIndex",
+ "type": "int",
+ "doc": "index of the file"
+ },
+ {
+ "name": "instantTime",
+ "type": "long",
+ "doc": "Timestamp at which record was added"
Review Comment:
this is millisec right? should make it clear in the doc
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -301,24 +300,10 @@ private void
validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>
private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata
commitMetadata,
HoodieInstant hoodieInstant) {
- boolean isTableServiceAction =
table.isTableServiceAction(hoodieInstant.getAction(),
hoodieInstant.getTimestamp());
Review Comment:
somehow this boolean it's not used but it should be meant for validation -
only proceed if it's a table service
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -147,6 +154,16 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION =
new Conversions.DecimalConversion();
+ // HoodieMetadata record index payload field ids
+ public static final String RECORD_INDEX_FIELD_PARTITION = "partition";
+ public static final String RECORD_INDEX_FIELD_FILEID_HIGH_BITS =
"fileIdHighBits";
+ public static final String RECORD_INDEX_FIELD_FILEID_LOW_BITS =
"fileIdLowBits";
+ public static final String RECORD_INDEX_FIELD_FILE_INDEX = "fileIndex";
+ public static final String RECORD_INDEX_FIELD_INSTANT_TIME = "instantTime";
+
+ // FileIndex value saved in record index record when the fileId has no index
(old format of base filename)
+ private static final int RECORD_INDEX_MISSING_FILEINDEX = -1;
Review Comment:
```suggestion
private static final int RECORD_INDEX_MISSING_FILEINDEX_FALLBACK = -1;
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -123,6 +124,9 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+
.hfileWriterToAllowDuplicates(writeConfig.hfileWriterToAllowDuplicates())
Review Comment:
i think we should fail the init with considerations
- when people use RLI, the table is supposed to have globally unique record
key
- for tables having dups, it's meant for append-only usecases, so RLI is not
applicable and should not be used
- when we first time launch an experimental feature, in general we should
favor restricting usage instead of trying to accommodate everything; we can
make it more flexible when iterating in future
in short i think in 0.14.0 we should strictly disallow duplicate scenario
with RLI as it breaks the uniqueness contract. users should dedup first or
double check if RLI is really what they need
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -95,12 +96,18 @@ protected HoodieWriteHandle(HoodieWriteConfig config,
String instantTime, String
this.writeSchema = overriddenSchema.orElseGet(() ->
getWriteSchema(config));
this.writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
this.timer = HoodieTimer.start();
- this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
- !hoodieTable.getIndex().isImplicitWithStorage(),
config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
this.schemaOnReadEnabled =
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
this.recordMerger = config.getRecordMerger();
+
+ // We need to track written records within WriteStatus in two cases:
+ // 1. When the HoodieIndex being used is not implicit with storage
+ // 2. If any of the metadata table partitions (record index, etc) which
require written record tracking are enabled
+ final boolean trackSuccessRecords =
!hoodieTable.getIndex().isImplicitWithStorage()
+ ||
HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(),
hoodieTable.getMetaClient());
Review Comment:
this could be encapsulated in a HoodieTable API say
`shouldTrackSuccessRecords()`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
+ final int numFileGroups = fileGroupSize;
+
+ // cache the input records if needed
Review Comment:
think the code is quite obvious so comment would be redundant
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
+ return fallbackIndex.tagLocation(records, context, hoodieTable);
+ }
+
+ // final variable required for lamda functions below
Review Comment:
we may skip this comment
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java:
##########
@@ -126,6 +131,15 @@ public boolean canWrite() {
@Override
public void writeAvro(String recordKey, IndexedRecord record) throws
IOException {
+ if (prevRecordKey.equals(recordKey)) {
+ // When this config is enabled, do not allow duplicates to be written to
hFile.
+ LOG.warn("Duplicate recordKey " + recordKey + " found while writing to
HFile. Record payload " + record);
+ if (this.hfileConfig.allowDuplicatesToBeInserted()) {
+ return;
+ } else {
+ throw new HoodieDuplicateKeyException("Duplicate recordKey " +
recordKey + " found while writing to HFile.");
+ }
+ }
Review Comment:
so this is to cater for tables already having existing dups or expecting to
have dups. can we derive this based on table config for e.g. if precombine
field is set, we do not allow dups, to save an extra config here. The advanced
config raises usability hurdle
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -322,6 +324,7 @@ public static List<HoodieRecord>
convertMetadataToFilesPartitionRecords(HoodieCo
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
// Update files listing records for each individual partition
+ int[] newFileCount = {0};
Review Comment:
you could use `HoodieAtomicLongAccumulator` for this
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
Review Comment:
this validation should have been done when config was first created so we
don't need to worry about it when use
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -666,6 +691,75 @@ private static HoodieMetadataColumnStats
mergeColumnStatsRecords(HoodieMetadataC
.build();
}
+ /**
+ * Create and return a {@code HoodieMetadataPayload} to insert or update an
entry for the record index.
+ * <p>
+ * Each entry maps the key of a single record in HUDI to its location.
+ *
+ * @param recordKey Key of the record
+ * @param partition Name of the partition which contains the record
+ * @param fileId fileId which contains the record
+ * @param instantTime instantTime when the record was added
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String recordKey, String partition,
+ String fileId, String instantTime) {
+ HoodieKey key = new HoodieKey(recordKey,
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+ // Data file names have a -D suffix to denote the index (D = integer) of
the file written
+ // In older HUID versions the file index was missing
+ final UUID uuid;
+ final int fileIndex;
+ try {
+ if (fileId.length() == 36) {
+ uuid = UUID.fromString(fileId);
+ fileIndex = RECORD_INDEX_MISSING_FILEINDEX;
+ } else {
+ final int index = fileId.lastIndexOf("-");
+ uuid = UUID.fromString(fileId.substring(0, index));
+ fileIndex = Integer.parseInt(fileId.substring(index + 1));
+ }
+ } catch (Exception e) {
+ throw new HoodieMetadataException(String.format("Invalid UUID or index:
fileID=%s, partition=%s, instantTIme=%s",
+ fileId, partition, instantTime), e);
+ }
+
+ // Store instantTime as milliseconds sinch epoch. Using an int here allows
dates till year 2038.
Review Comment:
don't quite get this comment - which one is int?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+ fileGroupSize =
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ ValidationUtils.checkState(fileGroupSize > 0, "Record index should have
at least one file group");
+ } catch (TableNotFoundException | IllegalStateException e) {
+ // This means that record index has not been initialized.
+ LOG.warn(String.format("Record index not initialized so falling back to
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+ // Fallback to another index so that tagLocation is still accurate and
there are no duplicates.
+ HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+ HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
Review Comment:
again i think config-level validation can be and should be done as early as
configs were loaded
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java:
##########
@@ -545,6 +545,33 @@ public void close() {
}
}
+ @Override
+ public ClosableIterator<String> getRecordKeyIterator() {
+ final HFileScanner scanner = reader.getScanner(false, false);
Review Comment:
we should improve the names to indicate usage clearly
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -73,35 +71,25 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
private static final Logger LOG =
LoggerFactory.getLogger(BaseTableMetadata.class);
- protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
- // NOTE: Buffer-size is deliberately set pretty low, since MT internally is
relying
- // on HFile (serving as persisted binary key-value mapping) to do
caching
- protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
-
protected final transient HoodieEngineContext engineContext;
protected final SerializablePath dataBasePath;
protected final HoodieTableMetaClient dataMetaClient;
protected final Option<HoodieMetadataMetrics> metrics;
protected final HoodieMetadataConfig metadataConfig;
- // Directory used for Spillable Map when merging records
- protected final String spillableMapDirectory;
- protected boolean isMetadataTableEnabled;
- protected boolean isBloomFilterIndexEnabled = false;
- protected boolean isColumnStatsIndexEnabled = false;
+ protected boolean isMetadataTableInitialized;
- protected BaseTableMetadata(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
- String dataBasePath, String
spillableMapDirectory) {
+ protected BaseTableMetadata(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig, String dataBasePath) {
this.engineContext = engineContext;
this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
this.dataMetaClient = HoodieTableMetaClient.builder()
.setConf(engineContext.getHadoopConf().get())
.setBasePath(dataBasePath)
.build();
- this.spillableMapDirectory = spillableMapDirectory;
this.metadataConfig = metadataConfig;
- this.isMetadataTableEnabled = metadataConfig.enabled();
+ this.isMetadataTableInitialized =
dataMetaClient.getTableConfig().isMetadataTableEnabled();
Review Comment:
saving this value may lead to inconsistency issue where MDT is physically
deleted and hoodie.properties was updated, but this value won't change. We
should refresh metaclient and always call
`ataMetaClient.getTableConfig().isMetadataTableEnabled()` to get the latest
value?
also we should renamed `isMetadataTableEnabled()` to
`isMetadataTableInitialized()` to distinguish it from reading
`hoodie.metadata.enable`
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -67,12 +57,20 @@ public static List<String> allPaths() {
);
}
+ /**
+ * Returns the list of metadata table partitions which require WriteStatus
to track written records.
+ * <p>
+ * These partitions need the list of written records so that they can update
their metadata.
+ */
+ public static List<MetadataPartitionType> needWriteStatusTracking() {
Review Comment:
`partitionsWithWriteStatusTracking()`; needXXX implies boolean return
i would actually suggest make this a property of MetadataPartitionType so
keep things cohesive.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -683,12 +683,19 @@ public void initializeBootstrapDirsIfNotExists() throws
IOException {
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(),
getFs());
}
+ /**
+ * Reload table config and cache.
+ */
+ public synchronized void reloadTableConfig() {
Review Comment:
should add `@visibleForTesting`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object>
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+ // The index to fallback upon when record index is not initialized yet.
+ // This should be a global index like record index so that the behavior of
tagging across partitions is not changed.
+ private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE =
IndexType.GLOBAL_SIMPLE;
+
+ public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
+ int fileGroupSize;
+ try {
+
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
Review Comment:
this validation can be done when instantiating?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]