This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit d0a7545c854e5ef99ed45edd1fe3520aa6dcaa74 Author: Vlad Rozov <[email protected]> AuthorDate: Mon Apr 30 07:11:20 2018 -0700 DRILL-6281: Introduce Collectors class for internal iterators closes #1238 --- .../drill/common/collections/Collectors.java | 123 +++++++++++++++++++++ .../org/apache/drill/exec/store/TimedCallable.java | 9 +- .../exec/store/parquet/metadata/Metadata.java | 17 ++- 3 files changed, 132 insertions(+), 17 deletions(-) diff --git a/common/src/main/java/org/apache/drill/common/collections/Collectors.java b/common/src/main/java/org/apache/drill/common/collections/Collectors.java new file mode 100644 index 0000000..3e80b2f --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/collections/Collectors.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.collections; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; + +public class Collectors { + private Collectors() { + } + + /** + * + * @param map {@code Map<K, V>} to collect elements from + * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T> + * @param <T> elements type in {@code List} + * @param <K> key type in {@code Map} + * @param <V> value type in {@code Map} + * @return new {@code List} that contains elements after applying mapper {@code BiFunction} to the input {@code Map} + */ + public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper) { + return collect(new ArrayList<>(map.size()), map, mapper); + } + + /** + * + * @param map {@code Map<K, V>} to collect elements from + * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T> + * @param predicate {@code Predicate} filter to apply + * @param <T> elements type in {@code List} + * @param <K> keys type in {@code Map} + * @param <V> value type in {@code Map} + * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code BiFunction} + * to the input {@code Map} + */ + public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) { + return collect(new ArrayList<>(map.size()), map, mapper, predicate); + } + + public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper) { + Preconditions.checkNotNull(list); + Preconditions.checkNotNull(map); + Preconditions.checkNotNull(mapper); + map.forEach((k, v) -> list.add(mapper.apply(k, v))); + return list; + } + + public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) { + Preconditions.checkNotNull(list); + Preconditions.checkNotNull(map); + Preconditions.checkNotNull(mapper); + Preconditions.checkNotNull(predicate); + map.forEach((k, v) -> { + T t = mapper.apply(k, v); + if (predicate.test(t)) { + list.add(t); + } + }); + return list; + } + + /** + * + * @param collection {@code Collection<E>} of elements of type <E> + * @param mapper {@code Function<E, T>} mapper function to apply + * @param <T> elements type in {@code List} + * @param <E> elements type in {@code Collection} + * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code Function} + * to the input {@code Collection} + */ + public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper) { + Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(mapper); + ArrayList<T> list = new ArrayList<>(collection.size()); + collection.forEach(e -> list.add(mapper.apply(e))); + return list; + } + + /** + * + * @param collection {@code Collection<E>} of elements of type <E> + * @param mapper {@code Function<E, T>} mapper function to apply + * @param predicate {@code Predicate} filter to apply + * @param <T> elements type in {@code List} + * @param <E> elements type in {@code Collection} + * @return new {@code List} that contains elements after applying mapper {@code Function} to the input {@code Collection} + */ + public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper, Predicate<T> predicate) { + Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(mapper); + Preconditions.checkNotNull(predicate); + ArrayList<T> list = new ArrayList<>(collection.size()); + collection.forEach(e -> { + T t = mapper.apply(e); + if (predicate.test(t)) { + list.add(t); + } + }); + return list; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java index ecc5579..3c2bbfe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store; import java.io.IOException; import java.util.List; -import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -30,8 +29,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; +import org.apache.drill.common.collections.Collectors; import org.apache.drill.common.exceptions.UserException; import org.slf4j.Logger; @@ -212,11 +211,7 @@ public abstract class TimedCallable<V> implements Callable<V> { final FutureMapper<V> futureMapper = new FutureMapper<>(); final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null; try { - return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS) - .stream() - .map(futureMapper) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + return Collectors.toList(threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS), futureMapper); } catch (InterruptedException e) { final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity); logger.error(errMsg, e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 49a6b52..cdf98e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.collections.Collectors; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.exec.store.TimedCallable; @@ -68,7 +69,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; @@ -294,7 +294,7 @@ public class Metadata { Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream() .collect( - Collectors.toMap( + java.util.stream.Collectors.toMap( Function.identity(), s -> fs, (oldFs, newFs) -> newFs, @@ -335,14 +335,11 @@ public class Metadata { */ private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3( ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException { - - List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream() - .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue())) - .collect(Collectors.toList()); - - List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>(); - metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16)); - return metaDataList; + return TimedCallable.run("Fetch parquet metadata", logger, + Collectors.toList(fileStatusMap, + (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)), + 16 + ); } /** -- To stop receiving notification emails like this one, please contact [email protected].
