Move over Google Cloud Dataflow worker utilities to worker module

Note that CopyableSeekableByteChannel and LazyMultiReaderIterator
were deleted and not moved because they were only used by TextReader
which has now been removed as well.

This is for Apache Beam.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115271204


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ff52a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ff52a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ff52a0e

Branch: refs/heads/master
Commit: 7ff52a0e184f984254bd9ad4dd71a9f67193e593
Parents: c857afa
Author: lcwik <[email protected]>
Authored: Mon Feb 22 14:33:36 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:25 2016 -0800

----------------------------------------------------------------------
 .../worker/CopyableSeekableByteChannel.java     | 270 ---------------
 .../runners/worker/LazyMultiReaderIterator.java |  87 -----
 .../sdk/runners/worker/ReaderUtils.java         |  74 -----
 .../sdk/util/common/worker/ElementCounter.java  |  32 --
 .../sdk/util/common/worker/Operation.java       | 182 ----------
 .../sdk/util/common/worker/OutputReceiver.java  |  74 -----
 .../sdk/util/common/worker/ProgressTracker.java |  38 ---
 .../common/worker/ProgressTrackerGroup.java     |  71 ----
 .../sdk/util/common/worker/ReadOperation.java   | 333 -------------------
 .../sdk/util/common/worker/Receiver.java        |  27 --
 10 files changed, 1188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/CopyableSeekableByteChannel.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/CopyableSeekableByteChannel.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/CopyableSeekableByteChannel.java
deleted file mode 100644
index 55d25b9..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/CopyableSeekableByteChannel.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import static com.google.api.client.util.Preconditions.checkNotNull;
-import static com.google.api.client.util.Preconditions.checkState;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SeekableByteChannel;
-
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * A {@link SeekableByteChannel} that adds copy semantics.
- *
- * <p>This implementation uses a lock to ensure that only one thread accesses
- * the underlying {@code SeekableByteChannel} at any given time.
- *
- * <p>{@link SeekableByteChannel#close} is called on the underlying channel 
once
- * all {@code CopyableSeekableByteChannel} objects copied from the initial
- * {@code CopyableSeekableByteChannel} are closed.
- *
- * <p>The implementation keeps track of the position of each
- * {@code CopyableSeekableByteChannel}; on access, it synchronizes with the
- * other {@code CopyableSeekableByteChannel} instances accessing the underlying
- * channel, seeks to its own position, performs the operation, updates its 
local
- * position, and returns the result.
- */
-final class CopyableSeekableByteChannel implements SeekableByteChannel {
-  /** This particular stream's position in the base stream. */
-  private long pos;
-
-  /**
-   * The synchronization object keeping track of the base
-   * {@link SeekableByteChannel}, its reference count, and its current 
position.
-   * This also doubles as the lock shared by all
-   * {@link CopyableSeekableByteChannel} instances derived from some original
-   * instance.
-   */
-  private final Sync sync;
-
-  /**
-   * Indicates whether this {@link CopyableSeekableByteChannel} is closed.
-   *
-   * <p>Invariant: Unclosed channels own a reference to the base channel,
-   * allowing us to make {@link #close} idempotent.
-   *
-   * <p>This is only modified under the sync lock.
-   */
-  private boolean closed;
-
-  /**
-   * Constructs a new {@link CopyableSeekableByteChannel}.  The supplied base
-   * channel will be closed when this channel and all derived channels are
-   * closed.
-   */
-  public CopyableSeekableByteChannel(SeekableByteChannel base) throws 
IOException {
-    this(new Sync(base), 0);
-
-    // Update the position to match the original stream's position.
-    //
-    // This doesn't actually need to be synchronized, but it's a little more
-    // obviously correct to always access sync.position while holding sync's
-    // internal monitor.
-    synchronized (sync) {
-      sync.position = base.position();
-      pos = sync.position;
-    }
-  }
-
-  /**
-   * The internal constructor used when deriving a new
-   * {@link CopyableSeekableByteChannel}.
-   *
-   * <p>N.B. This signature is deliberately incompatible with the public
-   * constructor.
-   *
-   * <p>Ordinarily, one would implement copy using a copy constructor, and pass
-   * the object being copied -- but that signature would be compatible with the
-   * public constructor creating a new set of
-   * {@code CopyableSeekableByteChannel} objects for some base channel.  The
-   * copy constructor would still be the one called, since its type is more
-   * specific, but that's fragile; it'd be easy to tweak the signature of the
-   * constructor used for copies without changing callers, which would silently
-   * fall back to using the public constructor.  So instead, we're careful to
-   * give this internal constructor its own unique signature.
-   */
-  private CopyableSeekableByteChannel(Sync sync, long pos) {
-    this.sync = checkNotNull(sync);
-    checkState(sync.base.isOpen(),
-        "the base SeekableByteChannel is not open");
-    synchronized (sync) {
-      sync.refCount++;
-    }
-    this.pos = pos;
-    this.closed = false;
-  }
-
-  /**
-   * Creates a new {@link CopyableSeekableByteChannel} derived from an existing
-   * channel, referencing the same base channel.
-   */
-  public CopyableSeekableByteChannel copy() throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      return new CopyableSeekableByteChannel(sync, pos);
-    }
-  }
-
-  // SeekableByteChannel implementation
-
-  @Override
-  public long position() throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      return pos;
-    }
-  }
-
-  @Override
-  public CopyableSeekableByteChannel position(long newPosition)
-      throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      // Verify that the position is valid for the base channel.
-      sync.base.position(newPosition);
-      this.pos = newPosition;
-      this.sync.position = newPosition;
-    }
-    return this;
-  }
-
-  @Override
-  public int read(ByteBuffer dst) throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      reposition();
-      int bytesRead = sync.base.read(dst);
-      notePositionAdded(bytesRead);
-      return bytesRead;
-    }
-  }
-
-  @Override
-  public long size() throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      return sync.base.size();
-    }
-  }
-
-  @Override
-  public CopyableSeekableByteChannel truncate(long size) throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      sync.base.truncate(size);
-      return this;
-    }
-  }
-
-  @Override
-  public int write(ByteBuffer src) throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        throw new ClosedChannelException();
-      }
-      reposition();
-      int bytesWritten = sync.base.write(src);
-      notePositionAdded(bytesWritten);
-      return bytesWritten;
-    }
-  }
-
-  @Override
-  public boolean isOpen() {
-    synchronized (sync) {
-      if (closed) {
-        return false;
-      }
-      return sync.base.isOpen();
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    synchronized (sync) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      sync.refCount--;
-      if (sync.refCount == 0) {
-        sync.base.close();
-      }
-    }
-  }
-
-  /**
-   * Updates the base stream's position to match the position required by this
-   * {@link CopyableSeekableByteChannel}.
-   */
-  @GuardedBy("sync")
-  private void reposition() throws IOException {
-    if (pos != sync.position) {
-      sync.base.position(pos);
-      sync.position = pos;
-    }
-  }
-
-  /**
-   * Notes that the specified amount has been logically added to the current
-   * stream's position.
-   */
-  @GuardedBy("sync")
-  private void notePositionAdded(int amount) {
-    if (amount < 0) {
-      return;  // Handles EOF indicators.
-    }
-    pos += amount;
-    sync.position += amount;
-  }
-
-  /**
-   * A simple value type used to synchronize a set of
-   * {@link CopyableSeekableByteChannel} instances referencing a single
-   * underlying channel.
-   */
-  private static final class Sync {
-    // N.B. Another way to do this would be to implement something like a
-    // RefcountingForwardingSeekableByteChannel.  Doing so would have the
-    // advantage of clearly isolating the mutable state, at the cost of a lot
-    // more code.
-    public final SeekableByteChannel base;
-    @GuardedBy("this") public long refCount = 0;
-    @GuardedBy("this") public long position = 0;
-
-    public Sync(SeekableByteChannel base) throws IOException {
-      this.base = checkNotNull(base);
-      position = base.position();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/LazyMultiReaderIterator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/LazyMultiReaderIterator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/LazyMultiReaderIterator.java
deleted file mode 100644
index 4a35fa6..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/LazyMultiReaderIterator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * Implements a ReaderIterator over a collection of inputs.
- *
- * <p>The sources are used sequentially, each consumed entirely before moving
- * to the next source.
- *
- * <p>The input is lazily constructed by using the abstract method {@code open}
- * to create a source iterator for inputs on demand.  This allows the resources
- * to be produced lazily, as an open source iterator may consume process
- * resources such as file descriptors.
- */
-abstract class LazyMultiReaderIterator<T> extends 
NativeReader.NativeReaderIterator<T> {
-  private final Iterator<String> inputs;
-  private NativeReader.NativeReaderIterator<T> current;
-
-  public LazyMultiReaderIterator(Iterator<String> inputs) {
-    this.inputs = inputs;
-  }
-
-  @Override
-  public boolean start() throws IOException {
-    return advance();
-  }
-
-  @Override
-  public boolean advance() throws IOException {
-    boolean currentStarted = true;
-    while (true) {
-      // Try moving through the current reader
-      if (current != null) {
-        if (currentStarted ? current.advance() : current.start()) {
-          return true;
-        }
-        current.close();
-        current = null;
-      }
-      // Current reader is done - move on to the next one.
-      if (!inputs.hasNext()) {
-        return false;
-      }
-      current = open(inputs.next());
-      currentStarted = false;
-    }
-  }
-
-  @Override
-  public T getCurrent() throws NoSuchElementException {
-    if (current == null) {
-      throw new NoSuchElementException();
-    }
-    return current.getCurrent();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (current != null) {
-      current.close();
-      current = null;
-    }
-  }
-
-  protected abstract NativeReader.NativeReaderIterator<T> open(String input) 
throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ReaderUtils.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ReaderUtils.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ReaderUtils.java
deleted file mode 100644
index 3e065f4..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ReaderUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utilities for working with {@link NativeReader} objects.
- */
-public class ReaderUtils {
-  /**
-   * Creates a {@link NativeReader.NativeReaderIterator} from the given {@link 
NativeReader} and
-   * reads it to the end.
-   *
-   * @param reader {@link NativeReader} to read from
-   */
-  public static <T> List<T> readAllFromReader(NativeReader<T> reader) throws 
IOException {
-    try (NativeReader.NativeReaderIterator<T> iterator = reader.iterator()) {
-      return readRemainingFromIterator(iterator, false);
-    }
-  }
-
-  /**
-   * Read elements from a {@link NativeReader.NativeReaderIterator} until 
either the reader is
-   * exhausted, or {@code n} elements are read. Specifying {@code n == 
Integer.MAX_VALUE} means
-   * read all remaining elements.
-   */
-  public static <T> List<T> readNItemsFromIterator(
-      NativeReader.NativeReaderIterator<T> reader, int n, boolean started) 
throws IOException {
-    List<T> res = new ArrayList<>();
-    for (long i = 0; n == Integer.MAX_VALUE || i < n; i++) {
-      if (!((i == 0 && !started) ? reader.start() : reader.advance())) {
-        break;
-      }
-      res.add(reader.getCurrent());
-    }
-    return res;
-  }
-
-  /**
-   * Read elements from a {@link NativeReader.NativeReaderIterator} until 
either the reader is
-   * exhausted, or n elements are read. Specifying {@code n == 
Integer.MAX_VALUE} means
-   * read all remaining elements.
-   */
-  public static <T> List<T> readNItemsFromUnstartedIterator(
-      NativeReader.NativeReaderIterator<T> reader, int n) throws IOException {
-    return readNItemsFromIterator(reader, n, false);
-  }
-
-  /**
-   * Read elements from a {@link NativeReader.NativeReaderIterator} until the 
reader is exhausted.
-   */
-  public static <T> List<T> readRemainingFromIterator(
-      NativeReader.NativeReaderIterator<T> reader, boolean started) throws 
IOException {
-    return readNItemsFromIterator(reader, Integer.MAX_VALUE, started);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ElementCounter.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ElementCounter.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ElementCounter.java
deleted file mode 100644
index eb90b06..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ElementCounter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-/**
- * Abstract interface that counts elements processed.
- */
-public interface ElementCounter {
-  /**
-   * Updates output counters.
-   */
-  public void update(Object elem) throws Exception;
-
-  /**
-   * Finishes output counters lazy updates.
-   */
-  public void finishLazyUpdate(Object elem);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Operation.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Operation.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Operation.java
deleted file mode 100644
index df7450c..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Operation.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-
-/**
- * The abstract base class for Operations, which correspond to
- * Instructions in the original MapTask InstructionGraph.
- *
- * <p>Call start() to start the operation.
- *
- * <p>A read operation's start() method actually reads the data, and in
- * effect runs the pipeline.
- *
- * <p>Call finish() to finish the operation.
- *
- * <p>Since both start() and finish() may call process() on
- * this operation's consumers, start an operation after
- * starting its consumers, and finish an operation before
- * finishing its consumers.
- */
-public abstract class Operation {
-  /** The name of this operation. */
-  public final String operationName;
-
-  /**
-   * The array of consuming receivers, one per operation output
-   * "port" (e.g., DoFn main or side output).  A receiver might be
-   * null if that output isn't being consumed.
-   */
-  public final OutputReceiver[] receivers;
-
-  /**
-   * The possible initialization states of an Operation.
-   * For internal self-checking purposes.
-   */
-  public enum InitializationState {
-    // start() hasn't yet been called.
-    UNSTARTED,
-
-    // start() has been called, but finish() hasn't yet been called.
-    STARTED,
-
-    // finish() has been called.
-    FINISHED
-  }
-
-  /**
-   * The initialization state of this Operation.
-   *
-   * <p>Written from one thread, but can be read by concurrent threads.
-   */
-  public InitializationState initializationState =
-      InitializationState.UNSTARTED;
-
-  /**
-   * The lock protecting the initialization state.
-   *
-   * <p>Subclasses can use this lock to protect their own state.
-   * However, this lock should be held only for short, bounded
-   * amounts of time.
-   */
-  protected final Object initializationStateLock = new Object();
-
-  protected final StateSampler stateSampler;
-
-  protected final int startState;
-  protected final int processState;
-  protected final int finishState;
-
-  public Operation(String operationName,
-                   OutputReceiver[] receivers,
-                   String counterPrefix,
-                   CounterSet.AddCounterMutator addCounterMutator,
-                   StateSampler stateSampler,
-                   StateSampler.StateKind stateKind) {
-    this.operationName = operationName;
-    this.receivers = receivers;
-    this.stateSampler = stateSampler;
-    startState = stateSampler.stateForName(operationName + "-start", 
stateKind);
-    processState = stateSampler.stateForName(operationName + "-process", 
stateKind);
-    finishState = stateSampler.stateForName(operationName + "-finish", 
stateKind);
-  }
-
-  /**
-   * Constructs an operation in the USER state kind.
-   */
-  public Operation(String operationName,
-                   OutputReceiver[] receivers,
-                   String counterPrefix,
-                   CounterSet.AddCounterMutator addCounterMutator,
-                   StateSampler stateSampler) {
-    this(operationName, receivers, counterPrefix, addCounterMutator,
-        stateSampler, StateSampler.StateKind.USER);
-  }
-
-  /**
-   * Checks that this operation is not yet started, throwing an
-   * exception otherwise.
-   */
-  void checkUnstarted() {
-    if (!(initializationState == InitializationState.UNSTARTED
-          || (initializationState == InitializationState.FINISHED
-              && supportsRestart()))) {
-      throw new AssertionError(
-          "expecting this instruction to not yet be started");
-    }
-  }
-
-  /**
-   * Checks that this operation has been started but not yet finished,
-   * throwing an exception otherwise.
-   */
-  void checkStarted() {
-    if (initializationState != InitializationState.STARTED) {
-      throw new AssertionError(
-          "expecting this instruction to be started");
-    }
-  }
-
-  /**
-   * Checks that this operation has been finished, throwing an
-   * exception otherwise.
-   */
-  void checkFinished() {
-    if (initializationState != InitializationState.FINISHED) {
-      throw new AssertionError(
-          "expecting this instruction to be finished");
-    }
-  }
-
-  /**
-   * Returns true if this Operation has been finished.
-   */
-  boolean isFinished() {
-    return (initializationState == InitializationState.FINISHED);
-  }
-
-  /**
-   * Starts this Operation's execution.  Called after all successsor
-   * consuming operations have been started.
-   */
-  public void start() throws Exception {
-    synchronized (initializationStateLock) {
-      checkUnstarted();
-      initializationState = InitializationState.STARTED;
-    }
-  }
-
-  /**
-   * Finishes this Operation's execution.  Called after all
-   * predecessor producing operations have been finished.
-   */
-  public void finish() throws Exception {
-    synchronized (initializationStateLock) {
-      checkStarted();
-      initializationState = InitializationState.FINISHED;
-    }
-  }
-
-  /**
-   * Returns true if this Operation can be started again after it is finished.
-   */
-  public boolean supportsRestart() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/OutputReceiver.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/OutputReceiver.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/OutputReceiver.java
deleted file mode 100644
index 12c77f7..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/OutputReceiver.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Receiver that forwards each input it receives to each of a list of
- * output Receivers. Additionally, it invokes output counters who track size
- * information for elements passing through.
- */
-public class OutputReceiver implements Receiver {
-  private final List<Receiver> outputs = new ArrayList<>();
-  private final List<ElementCounter> outputCounters = new ArrayList<>();
-
-  /**
-   * Adds a new receiver that this OutputReceiver forwards to.
-   */
-  public void addOutput(Receiver receiver) {
-    outputs.add(receiver);
-  }
-
-  public void addOutputCounter(ElementCounter outputCounter) {
-    outputCounters.add(outputCounter);
-  }
-
-
-  @Override
-  public void process(Object elem) throws Exception {
-    for (ElementCounter counter : outputCounters) {
-      counter.update(elem);
-    }
-
-    // Fan-out.
-    for (Receiver out : outputs) {
-      if (out != null) {
-        out.process(elem);
-      }
-    }
-
-    for (ElementCounter counter : outputCounters) {
-      counter.finishLazyUpdate(elem);
-    }
-  }
-
-  /** Invoked by tests only. */
-  public int getReceiverCount() {
-    return outputs.size();
-  }
-
-  /** Invoked by tests only. */
-  public Receiver getOnlyReceiver() {
-    if (outputs.size() != 1) {
-      throw new AssertionError("only one receiver expected");
-    }
-
-    return outputs.get(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTracker.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTracker.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTracker.java
deleted file mode 100644
index b88edeb..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTracker.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-/**
- * Provides an interface to an object capable of tracking progress through a
- * collection of elements to be processed.
- *
- * @param <T> the type of elements being tracked
- */
-public interface ProgressTracker<T> {
-  /**
-   * Copies this {@link ProgressTracker}.  The copied tracker will maintain its
-   * own independent notion of the caller's progress through the collection of
-   * elements being processed.
-   */
-  public ProgressTracker<T> copy();
-
-  /**
-   * Reports an element to this {@link ProgressTracker}, as the element is 
about
-   * to be processed.
-   */
-  public void saw(T element);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTrackerGroup.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTrackerGroup.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTrackerGroup.java
deleted file mode 100644
index 77e05c6..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ProgressTrackerGroup.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-/**
- * Implements a group of linked
- * {@link ProgressTracker ProgressTrackers} that
- * collectively track how far a processing loop has gotten through the elements
- * it's processing.  Individual {@code ProgressTracker} instances may be 
copied,
- * capturing an independent view of the progress of the system; this turns out
- * to be useful for some non-trivial processing loops.  The furthest point
- * reached by any {@code ProgressTracker} is the one reported.
- *
- * <p>This class is abstract.  Its single extension point is
- * {@link #report}, which should be overriden to provide a function that
- * handles the reporting of the supplied element, as appropriate.
- *
- * @param <T> the type of elements being tracked
- */
-public abstract class ProgressTrackerGroup<T> {
-  // TODO: Instead of an abstract class, strongly consider adding an
-  // interface like Receiver to the SDK, so that this class can be final and 
all
-  // that good stuff.
-  private long nextIndexToReport = 0;
-
-  public ProgressTrackerGroup() {}
-
-  public final ProgressTracker<T> start() {
-    return new Tracker(0);
-  }
-
-  /** Reports the indicated element. */
-  protected abstract void report(T element);
-
-  private final class Tracker implements ProgressTracker<T> {
-    private long nextElementIndex;
-
-    private Tracker(long nextElementIndex) {
-      this.nextElementIndex = nextElementIndex;
-    }
-
-    @Override
-    public ProgressTracker<T> copy() {
-      return new Tracker(nextElementIndex);
-    }
-
-    @Override
-    public void saw(T element) {
-      long thisElementIndex = nextElementIndex;
-      nextElementIndex++;
-      if (thisElementIndex == nextIndexToReport) {
-        nextIndexToReport = nextElementIndex;
-        report(element);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java
deleted file mode 100644
index c46e2a3..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/ReadOperation.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import static 
com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind.SUM;
-
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Observable;
-import java.util.Observer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A read operation.
- *
- * <p>Its start() method iterates through all elements of the source
- * and emits them on its output.
- */
-public class ReadOperation extends Operation {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReadOperation.class);
-
-  // This is the rate at which the local, threadsafe progress variable is 
updated from the iterator,
-  // not the rate of reporting.
-  public static final long DEFAULT_PROGRESS_UPDATE_PERIOD_MS = 100;
-
-  /**
-   * For the reader parallelism counters, large enough values should be 
sufficient, and there
-   * are issues with arbitrarily large values.
-   *
-   * Specifically, When reporting parallelism as a part of a sum, we want to 
cap it at a value that
-   * won't impose an artifical constraint on the services view of available 
parallelism, but small
-   * enough that that adding and subtracting this value for every bundle will 
not overwhelm values
-   * as small as 1.0.
-   */
-  @VisibleForTesting
-  public static final double LARGE_PARALLELISM_BOUND = 1e7;
-
-  /** The Reader this operation reads from. */
-  public final NativeReader<?> reader;
-
-  /** The total byte counter for all data read by this operation. */
-  final Counter<Long> byteCount;
-
-  /** The counter for estimating total parallelism in this task. */
-  private final Counter<Double> totalParallelismCounter;
-
-  /** The counter for estimating remaining parallelism in this task. */
-  private final Counter<Double> remainingParallelismCounter;
-
-  /**
-   * The Reader's iterator this operation reads from, created by start().
-   *
-   * Guarded by {@link Operation#initializationStateLock}.
-   */
-  volatile NativeReader.NativeReaderIterator<?> readerIterator = null;
-
-  /**
-   * A cache of {@link #readerIterator}'s progress updated inside the read loop
-   * at a bounded rate.
-   *
-   * <p>Necessary so that ReadOperation.getProgress() can return immediately, 
rather than
-   * potentially wait for a read to complete (which can take an unbounded 
time, delay a worker
-   * progress update, and cause lease expiration and all sorts of trouble).
-   */
-  private AtomicReference<NativeReader.Progress> progress = new 
AtomicReference<>();
-
-  /**
-   * On every iteration of the read loop, "progress" is fetched from
-   * {@link #readerIterator} if requested.
-   */
-  private long progressUpdatePeriodMs = DEFAULT_PROGRESS_UPDATE_PERIOD_MS;
-
-  /**
-   * Signals whether the next iteration of the read loop should update the 
progress.
-   *
-   * <p>Set to true every progressUpdatePeriodMs.
-   */
-  private AtomicBoolean isProgressUpdateRequested = new AtomicBoolean(true);
-
-
-  public ReadOperation(
-      String operationName,
-      NativeReader<?> reader,
-      OutputReceiver[] receivers,
-      String counterPrefix,
-      String systemStageName,
-      CounterSet.AddCounterMutator addCounterMutator,
-      StateSampler stateSampler) {
-    super(operationName, receivers, counterPrefix, addCounterMutator,
-          stateSampler, reader.getStateSamplerStateKind());
-    this.reader = reader;
-    this.byteCount = addCounterMutator.addCounter(
-        Counter.longs(bytesCounterName(counterPrefix, operationName), SUM));
-    reader.addObserver(new ReaderObserver());
-    reader.setStateSamplerAndOperationName(stateSampler, operationName);
-    this.totalParallelismCounter = addCounterMutator.addCounter(
-        Counter.doubles(totalParallelismCounterName(systemStageName), SUM));
-    // Set only when a task is started or split.
-    
totalParallelismCounter.resetToValue(boundParallelism(reader.getTotalParallelism()));
-    this.remainingParallelismCounter = addCounterMutator.addCounter(
-        Counter.doubles(remainingParallelismCounterName(systemStageName), 
SUM));
-  }
-
-  static ReadOperation forTest(
-      NativeReader<?> reader,
-      OutputReceiver outputReceiver,
-      String counterPrefix,
-      CounterSet.AddCounterMutator addCounterMutator,
-      StateSampler stateSampler) {
-    return new ReadOperation("ReadOperation", reader, new 
OutputReceiver[]{outputReceiver},
-        counterPrefix, "systemStageName", addCounterMutator, stateSampler);
-  }
-
-  public static final long DONT_UPDATE_PERIODICALLY = -1;
-  public static final long UPDATE_ON_EACH_ITERATION = 0;
-
-  /**
-   * Controls the frequency at which progress is updated.  The given value must
-   * be positive or one of the special values of DONT_UPDATE_PERIODICALLY or
-   * UPDATE_ON_EACH_ITERATION.
-   */
-  public void setProgressUpdatePeriodMs(long millis) {
-    assert millis > 0 || millis == DONT_UPDATE_PERIODICALLY || millis == 
UPDATE_ON_EACH_ITERATION;
-    progressUpdatePeriodMs = millis;
-  }
-
-  protected String bytesCounterName(String counterPrefix, String 
operationName) {
-    return operationName + "-ByteCount";
-  }
-
-  protected String totalParallelismCounterName(String systemStageName) {
-    return "dataflow_total_parallelism-" + systemStageName;
-  }
-
-  protected String remainingParallelismCounterName(String systemStageName) {
-    return "dataflow_remaining_parallelism-" + systemStageName;
-  }
-
-  public NativeReader<?> getReader() {
-    return reader;
-  }
-
-  @Override
-  public void start() throws Exception {
-    try (StateSampler.ScopedState start = 
stateSampler.scopedState(startState)) {
-      assert start != null;
-      super.start();
-      runReadLoop();
-    }
-  }
-
-  @Override
-  public boolean supportsRestart() {
-    return reader.supportsRestart();
-  }
-
-  protected void runReadLoop() throws Exception {
-    Receiver receiver = receivers[0];
-    if (receiver == null) {
-      // No consumer of this data; don't do anything.
-      return;
-    }
-
-    try (StateSampler.ScopedState process = 
stateSampler.scopedState(processState)) {
-      assert process != null;
-      {
-        // Call reader.iterator() outside the lock, because it can take an
-        // unbounded amount of time.
-        NativeReader.NativeReaderIterator<?> iterator = reader.iterator();
-        synchronized (initializationStateLock) {
-          readerIterator = iterator;
-        }
-      }
-
-      // TODO: Consider using the ExecutorService from PipelineOptions instead.
-      Thread updateRequester = null;
-      if (progressUpdatePeriodMs > 0) {
-        updateRequester = new Thread() {
-          @Override
-          public void run() {
-            while (true) {
-              isProgressUpdateRequested.set(true);
-              try {
-                Thread.sleep(progressUpdatePeriodMs);
-              } catch (InterruptedException e) {
-                break;
-              }
-            }
-          }
-        };
-        updateRequester.start();
-      }
-
-      try {
-        // Force a progress update at the beginning and at the end.
-        setProgressFromIterator();
-        for (boolean more = readerIterator.start(); more; more = 
readerIterator.advance()) {
-          if (isProgressUpdateRequested.getAndSet(false) ||
-              progressUpdatePeriodMs == UPDATE_ON_EACH_ITERATION) {
-            setProgressFromIterator();
-          }
-          receiver.process(readerIterator.getCurrent());
-        }
-        setProgressFromIterator();
-      } finally {
-        if (updateRequester != null) {
-          updateRequester.interrupt();
-          updateRequester.join();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void finish() throws Exception {
-    // Mark operation finished before closing the reader, so that anybody who 
checks if
-    // it's finished (e.g. requestDynamicSplit) won't use a closed reader.
-    super.finish();
-    readerIterator.close();
-  }
-
-  private void setProgressFromIterator() {
-    try {
-      progress.set(readerIterator.getProgress());
-      remainingParallelismCounter.resetToValue(
-          boundParallelism(readerIterator.getRemainingParallelism()));
-    } catch (UnsupportedOperationException e) {
-      // Ignore: same semantics as null.
-    } catch (Exception e) {
-      // This is not a normal situation, but should not kill the task.
-      LOG.warn("Progress estimation failed", e);
-    }
-  }
-
-  /**
-   * Returns a (possibly slightly stale) value of the progress of the task.
-   * Guaranteed to not block indefinitely. Needs to be thread-safe for sources
-   * which support dynamic work rebalancing.
-   *
-   * @return the task progress, or {@code null} if the source iterator has not
-   * been initialized
-   */
-  public NativeReader.Progress getProgress() {
-    return progress.get();
-  }
-
-  /**
-   * Relays the split request to {@code ReaderIterator}.
-   */
-  public NativeReader.DynamicSplitResult requestDynamicSplit(
-      NativeReader.DynamicSplitRequest splitRequest) {
-    synchronized (initializationStateLock) {
-      if (isFinished()) {
-        LOG.info("Iterator is in the Finished state, returning null stop 
position.");
-        return null;
-      }
-      if (readerIterator == null) {
-        LOG.info("Iterator has not been initialized, refusing to split at {}", 
splitRequest);
-        return null;
-      }
-      NativeReader.DynamicSplitResult result = 
readerIterator.requestDynamicSplit(splitRequest);
-      if (result != null) {
-        // After a successful split, the stop position changed and progress 
has to be recomputed.
-        setProgressFromIterator();
-        
totalParallelismCounter.resetToValue(boundParallelism(reader.getTotalParallelism()));
-      }
-      return result;
-    }
-  }
-
-  /**
-   * This is an observer on the instance of the source. Whenever source reads
-   * an element, update() gets called with the byte size of the element, which
-   * gets added up into the ReadOperation's byte counter.
-   *
-   * <p>Note that when the reader is a {@link GroupingShuffleReader}, update()
-   * is called for each underlying {@link ShuffleEntry} being read, with the
-   * byte size of the {@code ShuffleEntry} - it is not called for each grouped
-   * shuffle element (i.e. key and iterable of values).
-   */
-  private class ReaderObserver implements Observer {
-    @Override
-    public void update(Observable obs, Object obj) {
-      Preconditions.checkArgument(obs == reader, "unexpected observable");
-      Preconditions.checkArgument(obj instanceof Long, "unexpected parameter 
object");
-      byteCount.addValue((long) obj);
-    }
-  }
-
-  /**
-   * JSON doesn't correctly handle non-finite values, and we want to bound how 
large each
-   * term in the total sum is.  See {@link #LARGE_PARALLELISM_BOUND}.
-   *
-   * <p>TODO: Remove this hack once we move to gRPC or report this value in a 
more structured
-   * format.
-   */
-  private static double boundParallelism(double x) {
-    if (Double.isNaN(x) || x < 1) {
-      if (x < 1) {
-        LOG.warn("Invalid parallelism value: " + x);
-      }
-      // Irrational; sums won't come out to an integral value. This is to 
better avoid
-      // accidental coincidences which would imply the remaining parallelism 
is zero when it's not.
-      // Also, negative so that it's recognized as "invalid."
-      return -LARGE_PARALLELISM_BOUND * Math.sqrt(2);
-    } else if (x > LARGE_PARALLELISM_BOUND) {
-      return LARGE_PARALLELISM_BOUND;
-    } else {
-      return x;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ff52a0e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Receiver.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Receiver.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Receiver.java
deleted file mode 100644
index ca13ea3..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Receiver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- 
******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-/**
- * Abstract interface of things that accept inputs one at a time via process().
- */
-public interface Receiver {
-  /**
-   * Processes the element.
-   */
-  void process(Object outputElem) throws Exception;
-}


Reply via email to