nsivabalan commented on a change in pull request #3778: URL: https://github.com/apache/hudi/pull/3778#discussion_r731849802
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java ########## @@ -7,72 +7,77 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.index; +package org.apache.hudi.index.inmemory; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; /** * Hoodie Index implementation backed by an in-memory Hash map. * <p> * ONLY USE FOR LOCAL TESTING */ -@SuppressWarnings("checkstyle:LineLength") -public class JavaInMemoryHashIndex<T extends HoodieRecordPayload> extends JavaHoodieIndex<T> { +public class HoodieInMemoryHashIndex<T extends HoodieRecordPayload<T>> extends HoodieIndex<T> { private static ConcurrentMap<HoodieKey, HoodieRecordLocation> recordLocationMap; - public JavaInMemoryHashIndex(HoodieWriteConfig config) { + public HoodieInMemoryHashIndex(HoodieWriteConfig config) { super(config); - synchronized (JavaInMemoryHashIndex.class) { + synchronized (HoodieInMemoryHashIndex.class) { if (recordLocationMap == null) { recordLocationMap = new ConcurrentHashMap<>(); } } } @Override - public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context, - HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) { - List<HoodieRecord<T>> taggedRecords = new ArrayList<>(); - records.stream().forEach(record -> { - if (recordLocationMap.containsKey(record.getKey())) { - record.unseal(); - record.setCurrentLocation(recordLocationMap.get(record.getKey())); - record.seal(); + public HoodieData<HoodieRecord<T>> tagLocation( + HoodieData<HoodieRecord<T>> records, HoodieEngineContext context, + HoodieTable hoodieTable) { + return records.mapPartitions(hoodieRecordIterator -> { Review comment: nit. I see you are changing the logic a bit. in general, try to avoid making changes to logic in refactoring PRs. we may not catch any bugs around it since reviewers mostly focus on abstractions, code-reuse etc. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieRangeInfoHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. + */ +public class HoodieBloomIndex<T extends HoodieRecordPayload<T>> extends HoodieIndex<T> { Review comment: I assume this is just rename of HoodieBaseBloomIndex with HoodieBloomIndexHelper in addition. Let me know if there are any other additional changes ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.simple; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLocationFetchHandle; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.table.HoodieTable; + +import java.util.List; + +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * A simple index which reads interested fields(record key and partition path) from base files and + * joins with incoming records to find the tagged location. + * + * @param <T> type of {@link HoodieRecordPayload} + */ +public class HoodieSimpleIndex<T extends HoodieRecordPayload<T>> extends HoodieIndex<T> { Review comment: any changes as such compared to SparkHoodieSimpleIndex ? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java ########## @@ -18,54 +18,51 @@ package org.apache.hudi.index; -import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIMethod; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; -import org.apache.hudi.index.bloom.SparkHoodieBloomIndex; -import org.apache.hudi.index.bloom.SparkHoodieGlobalBloomIndex; +import org.apache.hudi.index.bloom.HoodieBloomIndex; +import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; +import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; -import org.apache.hudi.index.simple.SparkHoodieGlobalSimpleIndex; -import org.apache.hudi.index.simple.SparkHoodieSimpleIndex; -import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; +import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; +import org.apache.hudi.index.simple.HoodieSimpleIndex; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import org.apache.spark.api.java.JavaRDD; +import java.io.IOException; -@SuppressWarnings("checkstyle:LineLength") -public abstract class SparkHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - protected SparkHoodieIndex(HoodieWriteConfig config) { - super(config); - } - - public static SparkHoodieIndex createIndex(HoodieWriteConfig config) { +/** + * A factory to generate Spark {@link HoodieIndex}. + */ +public final class SparkHoodieIndexFactory { Review comment: yeah, its been a long due to fix the naming to factory :) thanks. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaPairRDDData; +import org.apache.hudi.data.HoodieJavaRDDData; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Helper for {@link HoodieBloomIndex} containing Spark-specific logic. + */ +public class SparkHoodieBloomIndexHelper extends HoodieBloomIndexHelper { + + private static final Logger LOG = LogManager.getLogger(SparkHoodieBloomIndexHelper.class); + + private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE = + new SparkHoodieBloomIndexHelper(); + + private SparkHoodieBloomIndexHelper() { + } + + public static SparkHoodieBloomIndexHelper getInstance() { + return SINGLETON_INSTANCE; + } + + @Override Review comment: I assume no code changes as such and you have just copied over as is. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * Holds a {@link List} of objects. + * + * @param <T> type of object. + */ +public class HoodieListData<T> extends HoodieData<T> { + + private final List<T> listData; + + private HoodieListData(List<T> listData) { + this.listData = listData; + } + + /** + * @param listData a {@link List} of objects in type T. + * @param <T> type of object. + * @return a new instance containing the {@link List<T>} reference. + */ + public static <T> HoodieListData<T> of(List<T> listData) { + return new HoodieListData<>(listData); + } + + /** + * @param hoodieData {@link HoodieListData<T>} instance containing the {@link List} of objects. + * @param <T> type of object. + * @return the a {@link List} of objects in type T. + */ + public static <T> List<T> getList(HoodieData<T> hoodieData) { + return ((HoodieListData<T>) hoodieData).get(); + } + + @Override + public List<T> get() { + return listData; + } + + @Override + public boolean isEmpty() { + return listData.isEmpty(); + } + + @Override + public void persist(String cacheConfig) { + // No OP + } + + @Override + public void unpersist() { + // No OP + } + + @Override + public long count() { + return listData.size(); + } + + @Override + public <O> HoodieData<O> map(SerializableFunction<T, O> func) { + return HoodieListData.of(listData.stream().parallel() + .map(throwingMapWrapper(func)).collect(Collectors.toList())); + } + + @Override + public <O> HoodieData<O> mapPartitions( + SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning) { + List<O> result = new ArrayList<>(); + throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add); Review comment: this is effectively a map operation right. there is no map Partitions equivalent I guess ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java ########## @@ -18,54 +18,51 @@ package org.apache.hudi.index; -import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.PublicAPIMethod; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; -import org.apache.hudi.index.bloom.SparkHoodieBloomIndex; -import org.apache.hudi.index.bloom.SparkHoodieGlobalBloomIndex; +import org.apache.hudi.index.bloom.HoodieBloomIndex; +import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; +import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; -import org.apache.hudi.index.simple.SparkHoodieGlobalSimpleIndex; -import org.apache.hudi.index.simple.SparkHoodieSimpleIndex; -import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; +import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; +import org.apache.hudi.index.simple.HoodieSimpleIndex; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import org.apache.spark.api.java.JavaRDD; +import java.io.IOException; -@SuppressWarnings("checkstyle:LineLength") -public abstract class SparkHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { - protected SparkHoodieIndex(HoodieWriteConfig config) { - super(config); - } - - public static SparkHoodieIndex createIndex(HoodieWriteConfig config) { +/** + * A factory to generate Spark {@link HoodieIndex}. + */ +public final class SparkHoodieIndexFactory { + public static HoodieIndex createIndex(HoodieWriteConfig config) { // first use index class config to create index. if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config); if (!(instance instanceof HoodieIndex)) { throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex"); } - return (SparkHoodieIndex) instance; + return (HoodieIndex) instance; } switch (config.getIndexType()) { case HBASE: return new SparkHoodieHBaseIndex<>(config); case INMEMORY: - return new SparkInMemoryHashIndex(config); + return new HoodieInMemoryHashIndex<>(config); case BLOOM: - return new SparkHoodieBloomIndex<>(config); + return new HoodieBloomIndex<>(config, SparkHoodieBloomIndexHelper.getInstance()); case GLOBAL_BLOOM: - return new SparkHoodieGlobalBloomIndex<>(config); + return new HoodieGlobalBloomIndex<>(config, SparkHoodieBloomIndexHelper.getInstance()); case SIMPLE: - return new SparkHoodieSimpleIndex(config); + return new HoodieSimpleIndex<>(config, getKeyGeneratorForSimpleIndex(config)); Review comment: not sure why we have to instantiate keyGen here and pass it in. All we need is a write config which we anyways pass to simpleIndex. may I know why can't we keep the keyGen instantiation within index impl. also, looks out of place having this in factory required only for some of the indexes. ########## File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java ########## @@ -62,6 +67,21 @@ public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier taskConte super(new SerializableConfiguration(conf), taskContextSupplier); } + @Override + public HoodieAccumulator createNewAccumulator() { Review comment: not required in this patch. I see lot similarities between flink engine context and java engine context. Did you think about adding ListBasedEngineContext and make Flink and Java engine contexts inherit from it. so that we can reuse most of the code. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPairData.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.FunctionWrapper; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * Implementation of {@link HoodiePairData} using Java {@link Map}. + * The pairs are organized by the key in the Map and values for the same key + * are stored in a list as the value corresponding to the key in the Map. + * + * @param <K> type of key. + * @param <V> type of value. + */ +public class HoodieMapPairData<K, V> extends HoodiePairData<K, V> { + + private final Map<K, List<V>> mapPairData; + + private HoodieMapPairData(Map<K, List<V>> mapPairData) { + this.mapPairData = mapPairData; + } + + /** + * @param mapPairData a {@link Map} of pairs. + * @param <K> type of key. + * @param <V> type of value. + * @return a new instance containing the {@link Map<K, List<V>>} reference. + */ + public static <K, V> HoodieMapPairData<K, V> of(Map<K, List<V>> mapPairData) { + return new HoodieMapPairData<>(mapPairData); + } + + /** + * @param hoodiePairData {@link HoodieMapPairData<K, V>} instance containing the {@link Map} of pairs. + * @param <K> type of key. + * @param <V> type of value. + * @return the {@link Map} of pairs. + */ + public static <K, V> Map<K, List<V>> getMapPair(HoodiePairData<K, V> hoodiePairData) { + return ((HoodieMapPairData<K, V>) hoodiePairData).get(); + } + + @Override + public Map<K, List<V>> get() { + return mapPairData; + } + + @Override + public void persist(String cacheConfig) { + // No OP + } + + @Override + public void unpersist() { + // No OP + } + + @Override + public HoodieData<K> keys() { + return HoodieListData.of(new ArrayList<>(mapPairData.keySet())); + } + + @Override + public HoodieData<V> values() { + return HoodieListData.of( + mapPairData.values().stream().flatMap(List::stream).collect(Collectors.toList())); + } + + @Override + public long count() { + return mapPairData.values().stream().map( + list -> (long) list.size()).reduce(Long::sum).orElse(0L); + } + + @Override + public Map<K, Long> countByKey() { + return mapPairData.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> (long) entry.getValue().size())); + } + + @Override + public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) { + Function<Pair<K, V>, O> throwableFunc = throwingMapWrapper(func); + return HoodieListData.of( + streamAllPairs().map(throwableFunc).collect(Collectors.toList())); + } + + @Override + public <L, W> HoodiePairData<L, W> mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) { + Map<L, List<W>> newMap = new HashMap<>(); + Function<Pair<K, V>, Pair<L, W>> throwableMapToPairFunc = + FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc); + streamAllPairs().map(pair -> throwableMapToPairFunc.apply(pair)).forEach(newPair -> { + List<W> list = newMap.computeIfAbsent(newPair.getKey(), k -> new ArrayList<>()); + list.add(newPair.getValue()); + }); + return HoodieMapPairData.of(newMap); + } + + @Override + public <W> HoodiePairData<K, Pair<V, Option<W>>> leftOuterJoin(HoodiePairData<K, W> other) { Review comment: not required in this patch. But can you add unit tests for these classes and transformation methods. these are going to be extensively used across the board. So, would be good to have unit tests around these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
