Repository: incubator-beam Updated Branches: refs/heads/master e01efbda3 -> 12b6ff8d7
Copy and use UnboundedReadFromBoundedSource in dataflow runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f2ac394 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f2ac394 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f2ac394 Branch: refs/heads/master Commit: 4f2ac394c207e0807042013fcab75296d8cf61df Parents: e01efbd Author: Pei He <[email protected]> Authored: Mon Jun 27 17:29:14 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Jun 29 22:45:44 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 34 +- .../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++ .../runners/dataflow/DataflowRunnerTest.java | 30 - 3 files changed, 577 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 70dd94f..2ba6c7b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTran import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.AssignWindows; import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; +import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; @@ -60,6 +61,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.PubsubUnboundedSink; @@ -350,11 +352,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { builder.put(View.AsIterable.class, StreamingViewAsIterable.class); builder.put(Write.Bound.class, StreamingWrite.class); builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); - builder.put(Read.Bounded.class, UnsupportedIO.class); - builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); + builder.put(Read.Bounded.class, StreamingBoundedRead.class); builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); - builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); - builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or @@ -2366,6 +2365,33 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** + * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the + * Dataflow runner in streaming mode. + */ + private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>> { + private final BoundedSource<T> source; + + /** Builds an instance of this class from the overridden transform. */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) { + this.source = transform.getSource(); + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public final PCollection<T> apply(PInput input) { + source.validate(); + + return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source)) + .setIsBoundedInternal(IsBounded.BOUNDED); + } + } + + /** * Specialized implementation for * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the * Dataflow runner in streaming mode. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java new file mode 100644 index 0000000..5e035bc --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.internal; + +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. + * + * <p>This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. + * + * @deprecated This class is copied from beam runners core in order to avoid pipeline construction + * time dependency. It should be replaced in the dataflow worker as an execution time dependency. + */ +@Deprecated +public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); + + private final BoundedSource<T> source; + + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ + public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) { + this.source = source; + } + + @Override + public PCollection<T> apply(PInput input) { + return input.getPipeline().apply( + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return "Read(" + approximateSimpleName(source.getClass()) + ")"; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include(source); + } + + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ + @VisibleForTesting + public static class BoundedToUnboundedSourceAdapter<T> + extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { + + private BoundedSource<T> boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); + return ImmutableList.of(this); + } + List<? extends BoundedSource<T>> splits + = boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); + return ImmutableList.of(this); + } + return Lists.transform( + splits, + new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { + @Override + public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } catch (Exception e) { + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); + return ImmutableList.of(this); + } + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader(null /* residualElements */, boundedSource, options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder<Checkpoint<T>> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } + + @VisibleForTesting + static class Checkpoint<T> implements UnboundedSource.CheckpointMark { + private final @Nullable List<TimestampedValue<T>> residualElements; + private final @Nullable BoundedSource<T> residualSource; + + public Checkpoint( + @Nullable List<TimestampedValue<T>> residualElements, + @Nullable BoundedSource<T> residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; + } + + @Override + public void finalizeCheckpoint() {} + + @VisibleForTesting + @Nullable List<TimestampedValue<T>> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource<T> getResidualSource() { + return residualSource; + } + } + + @VisibleForTesting + static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { + + @JsonCreator + public static CheckpointCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + // The coder for a list of residual elements and their timestamps + private final Coder<List<TimestampedValue<T>>> elemsCoder; + // The coder from the BoundedReader for coding each element + private final Coder<T> elemCoder; + // The nullable and serializable coder for the BoundedSource. + @SuppressWarnings("rawtypes") + private final Coder<BoundedSource> sourceCoder; + + CheckpointCoder(Coder<T> elemCoder) { + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); + this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + } + + @Override + public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + elemsCoder.encode(value.residualElements, outStream, nested); + sourceCoder.encode(value.residualSource, outStream, nested); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint<T> decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + return new Checkpoint<>( + elemsCoder.decode(inStream, nested), + sourceCoder.decode(inStream, nested)); + } + + @Override + public List<Coder<?>> getCoderArguments() { + return Arrays.<Coder<?>>asList(elemCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); + } + } + + /** + * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into + * {@link ResidualElements} and {@link ResidualSource}. + * + * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will + * be split into {@link ResidualElements} and {@link ResidualSource}. + */ + @VisibleForTesting + class Reader extends UnboundedReader<T> { + private ResidualElements residualElements; + private @Nullable ResidualSource residualSource; + private final PipelineOptions options; + private boolean done; + + Reader( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); + this.done = false; + } + + private void init( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + this.residualElements = residualElementsList == null + ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) + : new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { + return true; + } else { + done = true; + return false; + } + } + + @Override + public void close() throws IOException { + if (residualSource != null) { + residualSource.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * {@inheritDoc} + * + * <p>If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + * <p>If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ + @Override + public Checkpoint<T> getCheckpointMark() { + Checkpoint<T> newCheckpoint; + if (!residualElements.done()) { + // Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List<TimestampedValue<T>> elementsList; + private @Nullable Iterator<TimestampedValue<T>> elementsIterator; + private @Nullable TimestampedValue<T> currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List<TimestampedValue<T>> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + hasCurrent = false; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue<T> getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List<TimestampedValue<T>> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); + } + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource<T> residualSource; + private PipelineOptions options; + private @Nullable BoundedReader<T> reader; + private boolean closed; + + public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + this.closed = false; + } + + private boolean advance() throws IOException { + if (reader == null && !closed) { + reader = residualSource.createReader(options); + return reader.start(); + } else { + return reader.advance(); + } + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + closed = true; + } + + BoundedSource<T> getSource() { + return residualSource; + } + + Checkpoint<T> getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. + BoundedSource<T> residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttempts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } + } + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + try { + while (advance()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + return new Checkpoint<>(newResidualElements, residualSplit); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 999dc3a..0cf1ade 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -56,8 +56,6 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.AvroSource; -import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -898,34 +896,6 @@ public class DataflowRunnerTest { } @Test - public void testBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); - } - - @Test - public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); - } - - @Test - public void testAvroIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource( - AvroIO.Read.from("foo"), "AvroIO.Read", true); - } - - @Test - public void testTextIOSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); - } - - @Test - public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { - testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); - } - - @Test public void testReadUnboundedUnsupportedInBatch() throws Exception { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); }
