Move Google Cloud Dataflow worker utilities to worker module

SourceTranslationUtils had a component that was used for custom
source read translation. This component was moved directly into
the read translation so that the native reader source utilities
could move to the worker maven module along with the native reader
base class.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115288815


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1d0c6d0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1d0c6d0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1d0c6d0e

Branch: refs/heads/master
Commit: 1d0c6d0ef0ca24e2b1b7eb330dd0340f696f25ed
Parents: c11af5f
Author: lcwik <[email protected]>
Authored: Mon Feb 22 17:17:35 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:25 2016 -0800

----------------------------------------------------------------------
 .../sdk/runners/dataflow/ReadTranslator.java    |  41 ++-
 .../runners/worker/SourceTranslationUtils.java  | 150 --------
 .../sdk/util/common/worker/NativeReader.java    | 364 -------------------
 3 files changed, 40 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d0c6d0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
index 47a1926..f110e84 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
@@ -16,8 +16,11 @@
 
 package com.google.cloud.dataflow.sdk.runners.dataflow;
 
-import static 
com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceToDictionary;
+import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
+import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
+import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
 
+import com.google.api.services.dataflow.model.SourceMetadata;
 import com.google.cloud.dataflow.sdk.io.FileBasedSource;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.Source;
@@ -28,6 +31,9 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.PropertyNames;
 import com.google.cloud.dataflow.sdk.values.PValue;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Translator for the {@code Read} {@code PTransform} for the Dataflow 
back-end.
  */
@@ -61,4 +67,37 @@ public class ReadTranslator implements 
TransformTranslator<Read.Bounded<?>> {
       throw new RuntimeException(e);
     }
   }
+
+  // Represents a cloud Source as a dictionary for encoding inside the {@code 
SOURCE_STEP_INPUT}
+  // property of CloudWorkflowStep.input.
+  private static Map<String, Object> cloudSourceToDictionary(
+      com.google.api.services.dataflow.model.Source source) {
+    // Do not translate encoding - the source's encoding is translated 
elsewhere
+    // to the step's output info.
+    Map<String, Object> res = new HashMap<>();
+    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
+    if (source.getMetadata() != null) {
+      addDictionary(res, PropertyNames.SOURCE_METADATA,
+          cloudSourceMetadataToDictionary(source.getMetadata()));
+    }
+    if (source.getDoesNotNeedSplitting() != null) {
+      addBoolean(
+          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, 
source.getDoesNotNeedSplitting());
+    }
+    return res;
+  }
+
+  private static Map<String, Object> 
cloudSourceMetadataToDictionary(SourceMetadata metadata) {
+    Map<String, Object> res = new HashMap<>();
+    if (metadata.getProducesSortedKeys() != null) {
+      addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, 
metadata.getProducesSortedKeys());
+    }
+    if (metadata.getEstimatedSizeBytes() != null) {
+      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, 
metadata.getEstimatedSizeBytes());
+    }
+    if (metadata.getInfinite() != null) {
+      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, 
metadata.getInfinite());
+    }
+    return res;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d0c6d0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/SourceTranslationUtils.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/SourceTranslationUtils.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/SourceTranslationUtils.java
deleted file mode 100644
index 6c2c2c2..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/SourceTranslationUtils.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary;
-
-import com.google.api.services.dataflow.model.ApproximateReportedProgress;
-import com.google.api.services.dataflow.model.ApproximateSplitRequest;
-import com.google.api.services.dataflow.model.Position;
-import com.google.api.services.dataflow.model.Source;
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Utilities for representing input-specific objects
- * using Dataflow model protos.
- */
-public class SourceTranslationUtils {
-  public static NativeReader.Progress cloudProgressToReaderProgress(
-      @Nullable ApproximateReportedProgress cloudProgress) {
-    return cloudProgress == null ? null : new 
DataflowReaderProgress(cloudProgress);
-  }
-
-  public static NativeReader.Position cloudPositionToReaderPosition(
-      @Nullable Position cloudPosition) {
-    return cloudPosition == null ? null : new 
DataflowReaderPosition(cloudPosition);
-  }
-
-  public static ApproximateReportedProgress readerProgressToCloudProgress(
-      @Nullable NativeReader.Progress readerProgress) {
-    return readerProgress == null ? null : ((DataflowReaderProgress) 
readerProgress).cloudProgress;
-  }
-
-  public static Position toCloudPosition(@Nullable NativeReader.Position 
readerPosition) {
-    return readerPosition == null ? null : ((DataflowReaderPosition) 
readerPosition).cloudPosition;
-  }
-
-  public static ApproximateSplitRequest splitRequestToApproximateSplitRequest(
-      @Nullable NativeReader.DynamicSplitRequest splitRequest) {
-    return (splitRequest == null)
-        ? null : ((DataflowDynamicSplitRequest) splitRequest).splitRequest;
-  }
-
-  public static NativeReader.DynamicSplitRequest toDynamicSplitRequest(
-      @Nullable ApproximateSplitRequest splitRequest) {
-    return (splitRequest == null) ? null : new 
DataflowDynamicSplitRequest(splitRequest);
-  }
-
-  static class DataflowReaderProgress implements NativeReader.Progress {
-    public final ApproximateReportedProgress cloudProgress;
-
-    public DataflowReaderProgress(ApproximateReportedProgress cloudProgress) {
-      this.cloudProgress = cloudProgress;
-    }
-
-    @Override
-    public String toString() {
-      return String.valueOf(cloudProgress);
-    }
-  }
-
-  static class DataflowReaderPosition implements NativeReader.Position {
-    public final Position cloudPosition;
-
-    public DataflowReaderPosition(Position cloudPosition) {
-      this.cloudPosition = cloudPosition;
-    }
-
-    @Override
-    public String toString() {
-      return String.valueOf(cloudPosition);
-    }
-  }
-
-  // Represents a cloud Source as a dictionary for encoding inside the {@code 
SOURCE_STEP_INPUT}
-  // property of CloudWorkflowStep.input.
-  public static Map<String, Object> cloudSourceToDictionary(Source source) {
-    // Do not translate encoding - the source's encoding is translated 
elsewhere
-    // to the step's output info.
-    Map<String, Object> res = new HashMap<>();
-    addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
-    if (source.getMetadata() != null) {
-      addDictionary(res, PropertyNames.SOURCE_METADATA,
-          cloudSourceMetadataToDictionary(source.getMetadata()));
-    }
-    if (source.getDoesNotNeedSplitting() != null) {
-      addBoolean(
-          res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, 
source.getDoesNotNeedSplitting());
-    }
-    return res;
-  }
-
-  private static Map<String, Object> 
cloudSourceMetadataToDictionary(SourceMetadata metadata) {
-    Map<String, Object> res = new HashMap<>();
-    if (metadata.getProducesSortedKeys() != null) {
-      addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, 
metadata.getProducesSortedKeys());
-    }
-    if (metadata.getEstimatedSizeBytes() != null) {
-      addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, 
metadata.getEstimatedSizeBytes());
-    }
-    if (metadata.getInfinite() != null) {
-      addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, 
metadata.getInfinite());
-    }
-    return res;
-  }
-
-  public static Source dictionaryToCloudSource(Map<String, Object> params) 
throws Exception {
-    Source res = new Source();
-    res.setSpec(getDictionary(params, PropertyNames.SOURCE_SPEC));
-    // SOURCE_METADATA and SOURCE_DOES_NOT_NEED_SPLITTING do not have to be
-    // translated, because they only make sense in cloud Source objects 
produced by the user.
-    return res;
-  }
-
-  private static class DataflowDynamicSplitRequest implements 
NativeReader.DynamicSplitRequest {
-    public final ApproximateSplitRequest splitRequest;
-
-    private DataflowDynamicSplitRequest(ApproximateSplitRequest splitRequest) {
-      this.splitRequest = splitRequest;
-    }
-
-    @Override
-    public String toString() {
-      return String.valueOf(splitRequest);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d0c6d0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/NativeReader.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/NativeReader.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/NativeReader.java
deleted file mode 100644
index 20f399f..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/NativeReader.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler.StateKind;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-import java.util.Observable;
-
-import javax.annotation.Nullable;
-
-/**
- * Abstract base class for native readers in the Dataflow runner.
- *
- * <p>A {@link com.google.api.services.dataflow.model.Source} is read from by 
getting an
- * {@code Iterator}-like value and iterating through it.
- *
- * <p>This class is intended for formats that have built-in support on the 
Dataflow service.
- * <b>Do not introduce new implementations:</b> for creating new input 
formats, use
- * {@link com.google.cloud.dataflow.sdk.io.Source} instead.
- *
- * @param <T> the type of the elements read from the source
- */
-public abstract class NativeReader<T> extends Observable {
-  /**
-   * StateSampler object for readers interested in further breaking
-   * down of the state space at a finer granularity.
-   */
-  protected StateSampler stateSampler = null;
-
-  /**
-   * Name to be used as a prefix with {@code stateSampler}.
-   */
-  protected String stateSamplerOperationName = null;
-
-  /**
-   * Sets the state sampler and the state sampler operation name.
-   *
-   * @param stateSampler the {@link StateSampler} object
-   * @param stateSamplerOperationName the operation name to be used by
-   * the state sampler
-   */
-  public void setStateSamplerAndOperationName(
-      StateSampler stateSampler, String stateSamplerOperationName) {
-    this.stateSampler = stateSampler;
-    this.stateSamplerOperationName = stateSamplerOperationName;
-  }
-
-  /**
-   * Returns a ReaderIterator that allows reading from this source.
-   */
-  public abstract NativeReaderIterator<T> iterator() throws IOException;
-
-  /**
-   * A stateful iterator over the data in a {@link NativeReader}.
-   *
-   * <p>Partially thread-safe: methods {@link #start}, {@link #advance}, 
{@link #getCurrent},
-   * {@link #close} are called serially, but {@link #requestDynamicSplit} can 
be called
-   * asynchronously to those.
-   *
-   * <p>There will not be multiple concurrent calls to {@link 
#requestDynamicSplit}).
-   * {@link #getProgress} can be called concurrently to any other call, 
including itself, if
-   * {@link #requestDynamicSplit} is implemented.
-   */
-  public abstract static class NativeReaderIterator<T> implements 
AutoCloseable {
-
-    /**
-     * A value to return from {@link #getRemainingParallelism()} when 
remaining parallelism
-     * can be interpolated from {@link NativeReader#getTotalParallelism} and 
the progress fraction.
-     */
-    public static final double REMAINING_PARALLELISM_FROM_PROGRESS_FRACTION = 
Double.NaN;
-
-    /**
-     * Returns a representation of how far this iterator is through the source.
-     *
-     * @return the progress, or {@code null} if no progress measure can be 
provided
-     * (implementors are discouraged from throwing {@code 
UnsupportedOperationException}
-     * in this case). By default, returns {@code null}.
-     */
-    @Nullable
-    public Progress getProgress() {
-      return null;
-    }
-
-    /**
-     * Attempts to split the input in two parts: the "primary" part and the 
"residual" part.
-     * The current {@link NativeReaderIterator} keeps processing the primary 
part, while the
-     * residual part will be processed elsewhere (e.g. perhaps on a different 
worker).
-     *
-     * <p>The primary and residual parts, if concatenated, must represent the 
same input as the
-     * current input of this {@link NativeReaderIterator} before this call.
-     *
-     * <p>The boundary between the primary part and the residual part is 
specified in
-     * a framework-specific way using {@link 
NativeReader.DynamicSplitRequest}: e.g., if the
-     * framework supports the notion of positions, it might be a position at 
which the input is
-     * asked to split itself (which is not necessarily the same position at 
which it <i>will</i>
-     * split itself); it might be an approximate fraction of input, or 
something else.
-     *
-     * <p>{@link NativeReader.DynamicSplitResult} encodes, in a 
framework-specific way, the
-     * information sufficient to construct a description of the resulting 
primary and
-     * residual inputs.
-     * For example, it might, again, be a position demarcating these parts, or 
it might be a pair of
-     * fully-specified input descriptions, or something else.
-     *
-     * <p>After a successful call to {@link #requestDynamicSplit}, subsequent 
calls should be
-     * interpreted relative to the new primary.
-     *
-     * <p>This call should not affect the range of input represented by the 
{@link NativeReader}
-     * that produced this {@link NativeReaderIterator}.
-     *
-     * @return {@code null} if the {@link NativeReader.DynamicSplitRequest} 
cannot be honored
-     *   (in that case the input represented by this {@link 
NativeReaderIterator} stays the same),
-     *   or a {@link NativeReader.DynamicSplitResult} describing how the input 
was split into
-     *   a primary and residual part. By default, returns {@code null}.
-     */
-    @Nullable
-    public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest request) 
{
-      return null;
-    }
-
-    /**
-     * Returns an estimate of the degree of parallelism that could be achieved 
by
-     * {@link #requestDynamicSplit} taking into account what has already been 
consumed.
-     * E.g., if the reader has just returned the last record in the source, 
the remaining
-     * parallelism is 1 because it can't be split up any further. If the 
reader just
-     * returned the 3rd record in a perfectly parallelizable source with 5 
records,
-     * the remaining parallelism is 3 because it could be processed in 
parallel by this
-     * worker and two others.  If the reader does not support dynamic 
splitting,
-     * the remaining parallelism is always 1.
-     *
-     * <p>An exact number isn't required, mostly we want to be able to 
distinguish
-     * between many, few, or one. Should not block.
-     *
-     * <p>An implementor may return {@link 
#REMAINING_PARALLELISM_FROM_PROGRESS_FRACTION},
-     * in which case the remaining parallelism will be interpolated from
-     * {@link NativeReader#getTotalParallelism} using the current progress 
fraction.
-     * Infinity may also be returned (indicating no known bound on 
parallelism),
-     * as may fractional estimates (in which case the sum over all shards is 
taken).
-     *
-     * <p>By default, returns {@link 
#REMAINING_PARALLELISM_FROM_PROGRESS_FRACTION}.
-     */
-    public double getRemainingParallelism() {
-      return REMAINING_PARALLELISM_FROM_PROGRESS_FRACTION;
-    }
-
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     *
-     * <p>This method should be called exactly once. The invocation should 
occur prior to calling
-     * {@link #advance} or {@link #getCurrent}. This method may perform 
expensive operations that
-     * are needed to initialize the reader.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no 
more input available.
-     */
-    public abstract boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     *
-     * <p>It is an error to call this without having called {@link #start} 
first.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no 
more input available.
-     */
-    public abstract boolean advance() throws IOException;
-
-    /**
-     * Returns the value of the data item that was read by the last {@link 
#start} or
-     * {@link #advance} call. The returned value must be effectively immutable 
and remain valid
-     * indefinitely.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link 
#advance} should
-     * return the same result.
-     *
-     * @throws NoSuchElementException if {@link #start} was never called, or if
-     *         the last {@link #start} or {@link #advance} returned {@code 
false}
-     */
-    public abstract T getCurrent() throws NoSuchElementException;
-
-    /**
-     * @inheritDoc
-     */
-    @Override
-    public void close() throws IOException {
-      // By default, do nothing.
-    }
-  }
-
-  /**
-   * Adapter from old-style reader interface ({@link #hasNext}, {@link #next}) 
to new-style
-   * iteration interface ({@link NativeReaderIterator#start}, {@link 
NativeReaderIterator#advance},
-   * {@link NativeReaderIterator#getCurrent}).
-   *
-   * This class is temporary and the intention is to get rid of its subclasses 
one by one,
-   * converting them to use the new-style interface directly, and then remove 
this class.
-   *
-   * <p>Provides basic treatment of hasNext()/next() to simplify 
implementations (e.g. ensuring
-   * hasNext() is called only once and verifying hasNext() in next()) and 
default no-op
-   * implementations of other operations.
-   *
-   * <p><i>This class is intended for internal usage. Users of Dataflow must 
not subclass it.</i>
-   */
-  public abstract static class LegacyReaderIterator<T> extends 
NativeReaderIterator<T> {
-    private Boolean cachedHasNext;
-    private T current;
-    private boolean hasCurrent;
-
-    public final boolean hasNext() throws IOException {
-      if (cachedHasNext == null) {
-        cachedHasNext = hasNextImpl();
-      }
-      return cachedHasNext;
-    }
-
-    protected abstract boolean hasNextImpl() throws IOException;
-
-    public final T next() throws IOException, NoSuchElementException {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      cachedHasNext = null;
-      return nextImpl();
-    }
-
-    protected abstract T nextImpl() throws IOException;
-
-    @Override
-    public boolean start() throws IOException {
-      hasCurrent = advance();
-      return hasCurrent;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (!hasNext()) {
-        hasCurrent = false;
-        return false;
-      }
-      current = next();
-      hasCurrent = true;
-      return true;
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (!hasCurrent) {
-        throw new NoSuchElementException();
-      }
-      return current;
-    }
-  }
-
-  /**
-   * A representation of how far a {@code ReaderIterator} is through a
-   * {@code Reader}.
-   *
-   * <p>The common worker framework does not interpret instances of
-   * this interface.  But a tool-specific framework can make assumptions
-   * about the implementation, and so the concrete Reader subclasses used
-   * by a tool-specific framework should match.
-   */
-  public interface Progress {}
-
-  /**
-   * A representation of a position in an iteration through a
-   * {@code Reader}.
-   *
-   * <p>See the comment on {@link Progress} for how instances of this
-   * interface are used by the rest of the framework.
-   */
-  public interface Position {}
-
-  /**
-   * A framework-specific way to specify how {@link 
NativeReaderIterator#requestDynamicSplit}
-   * should split the input into a primary and residual part.
-   */
-  public interface DynamicSplitRequest {}
-
-  /**
-   * A framework-specific way to specify how {@link 
NativeReaderIterator#requestDynamicSplit}
-   * has split the input into a primary and residual part.
-   */
-  public interface DynamicSplitResult {}
-
-  /**
-   * A {@link NativeReader.DynamicSplitResult} that specifies the boundary 
between the primary and
-   * residual parts of the input using a {@link Position}.
-   */
-  public static final class DynamicSplitResultWithPosition implements 
DynamicSplitResult {
-    private final Position acceptedPosition;
-
-    public DynamicSplitResultWithPosition(Position acceptedPosition) {
-      this.acceptedPosition = acceptedPosition;
-    }
-
-    public Position getAcceptedPosition() {
-      return acceptedPosition;
-    }
-
-    @Override
-    public String toString() {
-      return String.valueOf(acceptedPosition);
-    }
-  }
-
-  /**
-   * Utility method to notify observers about a new element, which has
-   * been read by this Reader, and its size in bytes. Normally, there
-   * is only one observer, which is a ReadOperation that encapsules
-   * this Reader. Derived classes must call this method whenever they
-   * read additional data, even if that element may never be returned
-   * from the corresponding source iterator.
-   */
-  protected void notifyElementRead(long byteSize) {
-    setChanged();
-    notifyObservers(byteSize);
-  }
-
-  /**
-   * Returns whether this Reader can be restarted.
-   */
-  public boolean supportsRestart() {
-    return false;
-  }
-
-  /**
-   * Returns an estimate of the parallelism of the source being read by this 
reader, i.e.
-   * the number of bundles it could be split into.  An exact number isn't 
required, mostly
-   * we want to be able to distinguish between many, few, or one.  Used to cap 
the parallelism
-   * Dataflow will allocate for this part of the pipeline.  Should not block.
-   *
-   * <p>Defaults to positive infinity, indicating unbounded parallelism.  An 
unsplittable source
-   * would have parallelism exactly 1.
-   *
-   * <p>See also {@link NativeReaderIterator#getRemainingParallelism} which 
may be implemented to
-   * complement this method if a better-than-linear estimate of remaining 
parallelism can be
-   * obtained (e.g. it is easy to detect when one is at the last record).
-   */
-  public double getTotalParallelism() {
-    // By default, don't assume any limitations.
-    return Double.POSITIVE_INFINITY;
-  }
-
-  /**
-   * The default state kind of all the states reported in this reader.
-   * Defaults to {@link StateKind#USER}.
-   */
-  protected StateKind getStateSamplerStateKind() {
-    return StateKind.USER;
-  }
-}

Reply via email to