danny0405 commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1616449812
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,8 +396,8 @@ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> ol
*/
public static String getPartitionNameFromPartitionType(MetadataPartitionType
partitionType, HoodieTableMetaClient metaClient, String indexName) {
if (MetadataPartitionType.FUNCTIONAL_INDEX.equals(partitionType)) {
- checkArgument(metaClient.getFunctionalIndexMetadata().isPresent(),
"Index definition is not present");
- return
metaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName();
+ checkArgument(metaClient.getIndexesMetadata().isPresent(), "Index
definition is not present");
Review Comment:
`getIndeciesMetadata` or just `getIndexMetadata`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexesMetadata.java:
##########
@@ -76,19 +76,19 @@ public String toJson() throws JsonProcessingException {
* Deserialize from JSON string to create an instance of this class.
*
* @param json Input JSON string.
- * @return Deserialized instance of HoodieFunctionalIndexMetadata.
+ * @return Deserialized instance of HoodieIndexesMetadata.
* @throws IOException If any deserialization errors occur.
*/
- public static HoodieFunctionalIndexMetadata fromJson(String json) throws
IOException {
+ public static HoodieIndexesMetadata fromJson(String json) throws IOException
{
if (json == null || json.isEmpty()) {
- return new HoodieFunctionalIndexMetadata();
+ return new HoodieIndexesMetadata();
}
- return JsonUtils.getObjectMapper().readValue(json,
HoodieFunctionalIndexMetadata.class);
+ return JsonUtils.getObjectMapper().readValue(json,
HoodieIndexesMetadata.class);
}
@Override
public String toString() {
- return "HoodieFunctionalIndexMetadata{"
+ return "HoodieIndexesMetadata{"
Review Comment:
Let's avoid the hard code of the class name, we can fetch it through
`getClass.getSimpleName`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -410,6 +413,14 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
}
fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
break;
+ case SECONDARY_INDEX:
+ Set<String> secondaryIndexPartitionsToInit =
getSecondaryIndexPartitionsToInit();
+ if (secondaryIndexPartitionsToInit.isEmpty()) {
+ continue;
+ }
+ ValidationUtils.checkState(secondaryIndexPartitionsToInit.size()
== 1, "Only one secondary index at a time is supported for now");
Review Comment:
Then what about the other ones, they would never got a chance to bootstrap
right
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -77,7 +77,7 @@ class FunctionalIndexSupport(spark: SparkSession,
* Return true if metadata table is enabled and functional index metadata
partition is available.
*/
def isIndexAvailable: Boolean = {
- metadataConfig.isEnabled &&
metaClient.getFunctionalIndexMetadata.isPresent &&
!metaClient.getFunctionalIndexMetadata.get().getIndexDefinitions.isEmpty
+ metadataConfig.isEnabled && metaClient.getIndexesMetadata.isPresent &&
!metaClient.getIndexesMetadata.get().getIndexDefinitions.isEmpty
Review Comment:
`getIndexMetadata`
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -112,6 +113,14 @@ static boolean isMetadataTable(String basePath) {
return basePath.endsWith(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH);
}
+ static boolean isMetadataTableSecondaryIndexPartition(String basePath,
Option<String> partitionName) {
+ if (!isMetadataTable(basePath) || !partitionName.isPresent()) {
Review Comment:
Maybe we can move this utility to `HoodieTableMetadataUtil`.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexesMetadata.java:
##########
@@ -32,29 +32,29 @@
import java.util.Map;
/**
- * Represents the metadata for all functional indexes in Hudi.
+ * Represents the metadata for all functional and secondary indexes in Hudi.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class HoodieFunctionalIndexMetadata implements Serializable {
+public class HoodieIndexesMetadata implements Serializable {
Review Comment:
`HoodieIndexMetadata` should be fine.
##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the
field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");
+ public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME =
ConfigProperty
+ .key("hoodie.datasource.write.secondarykey.field")
Review Comment:
column or field, because in `HoodieMetadataConfig`, we have
`*.index.secondary.column`.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromFileSlices(HoodieEngine
});
}
- public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+ public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<StoragePath> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
Review Comment:
Can we fetch the table schema out of the `flatMap` loop, i.e. on the Spark
driver, the table schema resolver is a little colstly and it has to list the
metadata files each time.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -45,14 +48,14 @@ public class HoodieFunctionalIndexDefinition implements
Serializable {
// Any other configuration or properties specific to the index
private Map<String, String> indexOptions;
- public HoodieFunctionalIndexDefinition() {
+ public HoodieIndexDefinition() {
}
- public HoodieFunctionalIndexDefinition(String indexName, String indexType,
String indexFunction, List<String> sourceFields,
- Map<String, String> indexOptions) {
+ public HoodieIndexDefinition(String indexName, String indexType, String
indexFunction, List<String> sourceFields,
Review Comment:
So after this change, we only have secondary index and the notion of
functional index is removed.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -222,4 +227,48 @@ protected HoodieTable getHoodieTable(HoodieWriteConfig
writeConfig, HoodieTableM
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?>
initializeWriteClient() {
return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
}
+
+ @Override
+ protected HoodieData<HoodieRecord>
getSecondaryIndexRecordsFromFileSlices(List<Pair<String, FileSlice>>
partitionFileSlicePairs,
+
HoodieIndexDefinition indexDefinition,
+
int parallelism) throws IOException {
Review Comment:
Another choice is we add a `getEngineContext` abstract method for
sub-classes, and these methods impl can move to the base class.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1086,71 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs =
getPartitionFilePairs(commitMetadata);
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
+ status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+ // Consider those keys which were either updated or deleted in this
commit
+ if (!recordDelegate.getNewLocation().isPresent() ||
(recordDelegate.getCurrentLocation().isPresent() &&
recordDelegate.getNewLocation().isPresent())) {
+ keysToRemove.add(recordDelegate.getRecordKey());
+ }
+ });
+ });
+ HoodieIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
Review Comment:
is the functional index notion already removed?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromFileSlices(HoodieEngine
});
}
- public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+ public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<StoragePath> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
+ tableSchema = schemaResolver.getTableAvroSchema();
+ }
+
+ return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, tableSchema, partition, dataFilePath, indexDefinition);
+ });
+ }
+
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+ int
recordIndexMaxParallelism,
+
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) throws IOException {
+ if (partitionFileSlicePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+ final int parallelism = Math.min(partitionFileSlicePairs.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFileSlicePairs,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final FileSlice fileSlice = partitionAndBaseFile.getValue();
+
+ final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ final String filename = baseFile.getFileName();
+ StoragePath dataFilePath = filePath(basePath, partition, filename);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+ Schema tableSchema = schemaResolver.getTableAvroSchema();
Review Comment:
ditto
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1833,12 +1838,146 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromFileSlices(HoodieEngine
});
}
- public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
+ public static Schema
getProjectedSchemaForFunctionalIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<StoragePath> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getFileFormatUtils(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getStorage(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
+ tableSchema = schemaResolver.getTableAvroSchema();
+ }
+
+ return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, tableSchema, partition, dataFilePath, indexDefinition);
+ });
+ }
+
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+ int
recordIndexMaxParallelism,
+
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+
HoodieIndexDefinition indexDefinition) throws IOException {
+ if (partitionFileSlicePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+ final int parallelism = Math.min(partitionFileSlicePairs.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ return engineContext.parallelize(partitionFileSlicePairs,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final FileSlice fileSlice = partitionAndBaseFile.getValue();
+
+ final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ final String filename = baseFile.getFileName();
+ StoragePath dataFilePath = filePath(basePath, partition, filename);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+ Schema tableSchema = schemaResolver.getTableAvroSchema();
+
+ List<String> logFilePaths =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
+ .map(l -> l.getPath().toString()).collect(toList());
+
+ return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, tableSchema, partition, Option.of(dataFilePath), indexDefinition);
+ });
+ }
+
+ private static ClosableIterator<HoodieRecord>
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
+
EngineType engineType, List<String> logFilePaths,
+
Schema tableSchema, String partition,
+
Option<StoragePath> dataFilePath,
+
HoodieIndexDefinition indexDefinition) throws Exception {
+ final String basePath = metaClient.getBasePathV2().toString();
+ final StorageConfiguration<?> storageConf = metaClient.getStorageConf();
+
+ HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+ basePath,
+ engineType,
+ Collections.emptyList(),
+ metaClient.getTableConfig().getRecordMergerStrategy());
+
+ HoodieMergedLogRecordScanner mergedLogRecordScanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withStorage(metaClient.getStorage())
+ .withBasePath(basePath)
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(tableSchema)
+
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(""))
+ .withReverseReader(false)
+
.withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(),
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
+
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue())
+ .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
+ .withPartition(partition)
+ .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" +
HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))
+ .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
+
.withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+ .withRecordMerger(recordMerger)
+ .withTableMetaClient(metaClient)
+ .build();
+
+ Option<HoodieFileReader> baseFileReader = Option.empty();
+ if (dataFilePath.isPresent()) {
+ baseFileReader =
Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf),
dataFilePath.get()));
+ }
+ HoodieFileSliceReader fileSliceReader = new
HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema,
metaClient.getTableConfig().getPreCombineField(), recordMerger,
+ metaClient.getTableConfig().getProps(),
+ Option.empty());
+ ClosableIterator<HoodieRecord> fileSliceIterator =
ClosableIterator.wrap(fileSliceReader);
+ return new ClosableIterator<HoodieRecord>() {
+ @Override
+ public void close() {
+ fileSliceIterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return fileSliceIterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ HoodieRecord record = fileSliceIterator.next();
+ String recordKey = record.getRecordKey(tableSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ String secondaryKeyFields = String.join(".",
indexDefinition.getSourceFields());
+ String secondaryKey;
+ try {
+ GenericRecord genericRecord = (GenericRecord)
(record.toIndexedRecord(tableSchema, new Properties()).get()).getData();
Review Comment:
You can use `CollectionUtils.EMPTY_PROPERTIES` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]