garyli1019 commented on a change in pull request #2375: URL: https://github.com/apache/hudi/pull/2375#discussion_r554573020
########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetWriter; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; + +public class HoodieFlinkWriteableTestTable extends HoodieTestTable { Review comment: This class could extend `HoodieWriteableTestTable`. Can we directly extend from there? ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java ########## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; +import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.UUID.randomUUID; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit test against FlinkHoodieBloomIndex. + */ +public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { Review comment: Thanks for adding the tests! Maybe another TODO comment here, to merge code with Spark Bloom index tests. ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java ########## @@ -133,4 +189,69 @@ public synchronized void invoke(HoodieRecord value, Context context) throws Exce valuesList.add(value); } } + + /** + * Cleanups hoodie clients. + */ + protected void cleanupClients() throws java.io.IOException { + if (metaClient != null) { + metaClient = null; + } + if (writeClient != null) { + writeClient.close(); + writeClient = null; + } + if (tableView != null) { + tableView.close(); + tableView = null; + } + } + + /** + * Cleanups test data generator. + * + */ + protected void cleanupTestDataGenerator() { + if (dataGen != null) { + dataGen = null; + } + } + + /** + * Cleanups the distributed file system. + * + * @throws IOException + */ + protected void cleanupDFS() throws java.io.IOException { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + hdfsTestService = null; + dfsCluster = null; + dfs = null; + } + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + } + + /** + * Cleanups the executor service. + */ + protected void cleanupExecutorService() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + this.executorService = null; + } + } + + /** + * Cleanups Flink contexts. + */ + protected void cleanupFlinkContexts() { + if (context != null) { + LOG.info("Closing spark engine context used in previous test-case"); Review comment: flink engine ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java ########## @@ -133,4 +189,69 @@ public synchronized void invoke(HoodieRecord value, Context context) throws Exce valuesList.add(value); } } + + /** + * Cleanups hoodie clients. + */ + protected void cleanupClients() throws java.io.IOException { + if (metaClient != null) { + metaClient = null; + } + if (writeClient != null) { + writeClient.close(); + writeClient = null; + } + if (tableView != null) { + tableView.close(); + tableView = null; + } + } + + /** + * Cleanups test data generator. + * + */ + protected void cleanupTestDataGenerator() { Review comment: This already exists in the base class. Please remove. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import com.beust.jcommander.internal.Lists; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.index.FlinkHoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.io.HoodieRangeInfoHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. + */ +@SuppressWarnings("checkstyle:LineLength") +public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkHoodieIndex<T> { + + private static final Logger LOG = LogManager.getLogger(FlinkHoodieBloomIndex.class); + + public FlinkHoodieBloomIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context, + HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) { + // Step 1: Extract out thinner Map of (partitionPath, recordKey) + Map<String, List<String>> partitionRecordKeyMap = new HashMap<>(); + records.forEach(record -> { + if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) { + partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey()); + } else { + List<String> recordKeys = Lists.newArrayList(); + recordKeys.add(record.getRecordKey()); + partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys); + } + }); + + // Step 2: Lookup indexes for all the partition/recordkey pair + Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap = + lookupIndex(partitionRecordKeyMap, context, hoodieTable); + + if (LOG.isDebugEnabled()) { + long totalTaggedRecords = keyFilenamePairMap.values().size(); + LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); + } + + // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys + List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records); + + return taggedRecords; + } + + /** + * Lookup the location for each record key and return the pair<record_key,location> for all record keys already + * present and drop the record keys if not present. + */ + private Map<HoodieKey, HoodieRecordLocation> lookupIndex( + Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context, + final HoodieTable hoodieTable) { + // Obtain records per partition, in the incoming records + Map<String, Long> recordsPerPartition = new HashMap<>(); + partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size()))); + List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); + + // Step 2: Load all involved files as <Partition, filename> pairs + List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = + loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); + final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = + fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); + + // Step 3: Obtain a List, for each incoming record, that already exists, with the file id, + // that contains it. + List<Tuple2<String, HoodieKey>> fileComparisons = + explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap); + return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable); + } + + /** + * Load all involved files as <Partition, filename> pair List. + */ + List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context, Review comment: Duplicate codes with Spark, but it's ok for now. We need to do some refactoring work later. Can we add a TODO in the comment session as a reminder? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import com.beust.jcommander.internal.Lists; Review comment: nit: we usually follow the import order in hudi -> 3rd party packages -> java -> scala ->static. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.io.HoodieKeyLookupHandle.KeyLookupResult; +import org.apache.hudi.table.HoodieTable; + +import scala.Tuple2; + +import java.util.function.Function; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Function performing actual checking of List partition containing (fileId, hoodieKeys) against the actual files. + */ +public class HoodieFlinkBloomIndexCheckFunction + implements Function<Iterator<Tuple2<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> { + + private final HoodieTable hoodieTable; + + private final HoodieWriteConfig config; + + public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { + this.hoodieTable = hoodieTable; + this.config = config; + } + + @Override + public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) { Review comment: I think we can wrap this up and reuse it in the `HoodieBloomIndexCheckFunction` from spark client, so we can move this into the `hudi-client-common`, but let's add a TODO comment for now. ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java ########## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; +import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.UUID.randomUUID; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit test against FlinkHoodieBloomIndex. + */ +public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { + + private static final Schema SCHEMA = getSchemaFromResource(TestFlinkHoodieBloomIndex.class, "/exampleSchema.avsc", true); + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}"; + + public static java.util.stream.Stream<org.junit.jupiter.params.provider.Arguments> configParams() { Review comment: nit: please move packages to import ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java ########## @@ -58,6 +83,15 @@ public void setTestMethodName(TestInfo testInfo) { } } + @org.junit.jupiter.api.BeforeEach Review comment: let's remove this, we don't have to call this every time. ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java ########## @@ -18,10 +18,16 @@ package org.apache.hudi.testutils; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; Review comment: nit: import sort ########## File path: hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java ########## @@ -66,6 +100,15 @@ protected void initFlinkMiniCluster() { .build()); } + /** + * Initializes the Flink contexts with the given application name. + * + */ + protected void initFlinkContexts() { + context = new HoodieFlinkEngineContext(supplier); + hadoopConf = context.getHadoopConf().get(); Review comment: This was overridden by `initFileSystem()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
