Extracts common logic from TextIO.ReadAll into a utility transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf0b363 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf0b363 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf0b363 Branch: refs/heads/master Commit: eaf0b36313fcd59963b2efbf16f50dd913da7de2 Parents: e80c83b Author: Eugene Kirpichov <[email protected]> Authored: Fri Jul 21 14:09:13 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Jul 25 17:36:49 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/ReadAllViaFileBasedSource.java | 152 +++++++++++++++++++ .../java/org/apache/beam/sdk/io/TextIO.java | 135 ++++------------ 2 files changed, 179 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java new file mode 100644 index 0000000..66aa41e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Reads each filepattern in the input {@link PCollection} using given parameters for splitting + * files into offset ranges and for creating a {@link FileBasedSource} for a file. + */ +class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> { + private final SerializableFunction<String, Boolean> isSplittable; + private final long desiredBundleSizeBytes; + private final SerializableFunction<String, FileBasedSource<T>> createSource; + + public ReadAllViaFileBasedSource( + SerializableFunction<String, Boolean> isSplittable, + long desiredBundleSizeBytes, + SerializableFunction<String, FileBasedSource<T>> createSource) { + this.isSplittable = isSplittable; + this.desiredBundleSizeBytes = desiredBundleSizeBytes; + this.createSource = createSource; + } + + @Override + public PCollection<T> expand(PCollection<String> input) { + return input + .apply("Expand glob", ParDo.of(new ExpandGlobFn())) + .apply( + "Split into ranges", + ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes))) + .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>()) + .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource))); + } + + private static class ReshuffleWithUniqueKey<T> + extends PTransform<PCollection<T>, PCollection<T>> { + @Override + public PCollection<T> expand(PCollection<T> input) { + return input + .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>())) + .apply("Reshuffle", Reshuffle.<Integer, T>of()) + .apply("Values", Values.<T>create()); + } + } + + private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> { + private int index; + + @Setup + public void setup() { + this.index = ThreadLocalRandom.current().nextInt(); + } + + @ProcessElement + public void process(ProcessContext c) { + c.output(KV.of(++index, c.element())); + } + } + + private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> { + @ProcessElement + public void process(ProcessContext c) throws Exception { + MatchResult match = FileSystems.match(c.element()); + checkArgument( + match.status().equals(MatchResult.Status.OK), + "Failed to match filepattern %s: %s", + c.element(), + match.status()); + for (MatchResult.Metadata metadata : match.metadata()) { + c.output(metadata); + } + } + } + + private static class SplitIntoRangesFn + extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> { + private final SerializableFunction<String, Boolean> isSplittable; + private final long desiredBundleSizeBytes; + + private SplitIntoRangesFn( + SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes) { + this.isSplittable = isSplittable; + this.desiredBundleSizeBytes = desiredBundleSizeBytes; + } + + @ProcessElement + public void process(ProcessContext c) { + MatchResult.Metadata metadata = c.element(); + if (!metadata.isReadSeekEfficient() + || !isSplittable.apply(metadata.resourceId().toString())) { + c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); + return; + } + for (OffsetRange range : + new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) { + c.output(KV.of(metadata, range)); + } + } + } + + private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> { + private final SerializableFunction<String, FileBasedSource<T>> createSource; + + private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) { + this.createSource = createSource; + } + + @ProcessElement + public void process(ProcessContext c) throws IOException { + MatchResult.Metadata metadata = c.element().getKey(); + OffsetRange range = c.element().getValue(); + FileBasedSource<T> source = createSource.apply(metadata.toString()); + try (BoundedSource.BoundedReader<T> reader = + source + .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo()) + .createReader(c.getPipelineOptions())) { + for (boolean more = reader.start(); more; more = reader.advance()) { + c.output(reader.getCurrent()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 7b4c483..73040da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -37,23 +35,14 @@ import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; -import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -356,120 +345,50 @@ public class TextIO { @Override public PCollection<String> expand(PCollection<String> input) { return input - .apply("Expand glob", ParDo.of(new ExpandGlobFn())) .apply( - "Split into ranges", - ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes()))) - .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>()) - .apply("Read", ParDo.of(new ReadTextFn(this))); + "Read all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + new IsSplittableFn(getCompressionType()), + getDesiredBundleSizeBytes(), + new CreateTextSourceFn(getCompressionType()))) + .setCoder(StringUtf8Coder.of()); } - private static class ReshuffleWithUniqueKey<T> - extends PTransform<PCollection<T>, PCollection<T>> { - @Override - public PCollection<T> expand(PCollection<T> input) { - return input - .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>())) - .apply("Reshuffle", Reshuffle.<Integer, T>of()) - .apply("Values", Values.<T>create()); - } - } - - private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> { - private int index; - - @Setup - public void setup() { - this.index = ThreadLocalRandom.current().nextInt(); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - @ProcessElement - public void process(ProcessContext c) { - c.output(KV.of(++index, c.element())); - } + builder.add( + DisplayData.item("compressionType", getCompressionType().toString()) + .withLabel("Compression Type")); } - private static class ExpandGlobFn extends DoFn<String, Metadata> { - @ProcessElement - public void process(ProcessContext c) throws Exception { - MatchResult match = FileSystems.match(c.element()); - checkArgument( - match.status().equals(Status.OK), - "Failed to match filepattern %s: %s", - c.element(), - match.status()); - for (Metadata metadata : match.metadata()) { - c.output(metadata); - } - } - } - - private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> { + private static class CreateTextSourceFn + implements SerializableFunction<String, FileBasedSource<String>> { private final CompressionType compressionType; - private final long desiredBundleSize; - private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) { + private CreateTextSourceFn(CompressionType compressionType) { this.compressionType = compressionType; - this.desiredBundleSize = desiredBundleSize; - } - - @ProcessElement - public void process(ProcessContext c) { - Metadata metadata = c.element(); - final boolean isSplittable = isSplittable(metadata, compressionType); - if (!isSplittable) { - c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); - return; - } - for (OffsetRange range : - new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) { - c.output(KV.of(metadata, range)); - } } - static boolean isSplittable(Metadata metadata, CompressionType compressionType) { - if (!metadata.isReadSeekEfficient()) { - return false; - } - switch (compressionType) { - case AUTO: - return !CompressionMode.isCompressed(metadata.resourceId().toString()); - case UNCOMPRESSED: - return true; - case GZIP: - case BZIP2: - case ZIP: - case DEFLATE: - return false; - default: - throw new UnsupportedOperationException("Unknown compression type: " + compressionType); - } + @Override + public FileBasedSource<String> apply(String input) { + return Read.wrapWithCompression( + new TextSource(StaticValueProvider.of(input)), compressionType); } } - private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> { - private final TextIO.ReadAll spec; + private static class IsSplittableFn implements SerializableFunction<String, Boolean> { + private final CompressionType compressionType; - private ReadTextFn(ReadAll spec) { - this.spec = spec; + private IsSplittableFn(CompressionType compressionType) { + this.compressionType = compressionType; } - @ProcessElement - public void process(ProcessContext c) throws IOException { - Metadata metadata = c.element().getKey(); - OffsetRange range = c.element().getValue(); - FileBasedSource<String> source = - TextIO.Read.wrapWithCompression( - new TextSource(StaticValueProvider.of(metadata.toString())), - spec.getCompressionType()); - try (BoundedSource.BoundedReader<String> reader = - source - .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo()) - .createReader(c.getPipelineOptions())) { - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } - } + @Override + public Boolean apply(String filename) { + return compressionType == CompressionType.UNCOMPRESSED + || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename)); } } }
