http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java deleted file mode 100644 index d51fc7e..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowFn.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms.windowing; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.common.collect.Ordering; - -import org.joda.time.Instant; - -import java.io.Serializable; -import java.util.Collection; - -/** - * The argument to the {@link Window} transform used to assign elements into - * windows and to determine how windows are merged. See {@link Window} for more - * information on how {@code WindowFn}s are used and for a library of - * predefined {@code WindowFn}s. - * - * <p>Users will generally want to use the predefined - * {@code WindowFn}s, but it is also possible to create new - * subclasses. - * - * <p>To create a custom {@code WindowFn}, inherit from this class and override all required - * methods. If no merging is required, inherit from {@link NonMergingWindowFn} - * instead. If no merging is required and each element is assigned to a single window, inherit from - * {@code PartitioningWindowFn}. Inheriting from the most specific subclass will enable more - * optimizations in the runner. - * - * @param <T> type of elements being windowed - * @param <W> {@link BoundedWindow} subclass used to represent the - * windows used by this {@code WindowFn} - */ -public abstract class WindowFn<T, W extends BoundedWindow> - implements Serializable { - /** - * Information available when running {@link #assignWindows}. - */ - public abstract class AssignContext { - /** - * Returns the current element. - */ - public abstract T element(); - - /** - * Returns the timestamp of the current element. - */ - public abstract Instant timestamp(); - - /** - * Returns the windows the current element was in, prior to this - * {@code WindowFn} being called. - */ - public abstract Collection<? extends BoundedWindow> windows(); - } - - /** - * Given a timestamp and element, returns the set of windows into which it - * should be placed. - */ - public abstract Collection<W> assignWindows(AssignContext c) throws Exception; - - /** - * Information available when running {@link #mergeWindows}. - */ - public abstract class MergeContext { - /** - * Returns the current set of windows. - */ - public abstract Collection<W> windows(); - - /** - * Signals to the framework that the windows in {@code toBeMerged} should - * be merged together to form {@code mergeResult}. - * - * <p>{@code toBeMerged} should be a subset of {@link #windows} - * and disjoint from the {@code toBeMerged} set of previous calls - * to {@code merge}. - * - * <p>{@code mergeResult} must either not be in {@link #windows} or be in - * {@code toBeMerged}. - * - * @throws IllegalArgumentException if any elements of toBeMerged are not - * in windows(), or have already been merged - */ - public abstract void merge(Collection<W> toBeMerged, W mergeResult) - throws Exception; - } - - /** - * Does whatever merging of windows is necessary. - * - * <p>See {@link MergeOverlappingIntervalWindows#mergeWindows} for an - * example of how to override this method. - */ - public abstract void mergeWindows(MergeContext c) throws Exception; - - /** - * Returns whether this performs the same merging as the given - * {@code WindowFn}. - */ - public abstract boolean isCompatible(WindowFn<?, ?> other); - - /** - * Returns the {@link Coder} used for serializing the windows used - * by this windowFn. - */ - public abstract Coder<W> windowCoder(); - - /** - * Returns the window of the side input corresponding to the given window of - * the main input. - * - * <p>Authors of custom {@code WindowFn}s should override this. - */ - public abstract W getSideInputWindow(final BoundedWindow window); - - /** - * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate - * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending - * {@link OutputTimeFn.Defaults}. - */ - @Deprecated - @Experimental(Kind.OUTPUT_TIME) - public Instant getOutputTime(Instant inputTimestamp, W window) { - return getOutputTimeFn().assignOutputTime(inputTimestamp, window); - } - - /** - * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}. - * See the full specification there. - * - * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably - * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}. - * - * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is - * suggested that the result in later overlapping windows is past the end of earlier windows so - * that the later windows don't prevent the watermark from progressing past the end of the earlier - * window. - * - * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next - * sliding window. See {@link SlidingWindows#getOutputTimeFn}. - */ - @Experimental(Kind.OUTPUT_TIME) - public OutputTimeFn<? super W> getOutputTimeFn() { - return new OutputAtEarliestAssignedTimestamp<>(this); - } - - /** - * Returns true if this {@code WindowFn} never needs to merge any windows. - */ - public boolean isNonMerging() { - return false; - } - - /** - * Returns true if this {@code WindowFn} assigns each element to a single window. - */ - public boolean assignsToSingleWindow() { - return false; - } - - /** - * A compatibility adapter that will return the assigned timestamps according to the - * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps - * on the {@link WindowFn} is now deprecated. - */ - private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow> - extends OutputTimeFn.Defaults<W> { - - private final WindowFn<?, W> windowFn; - - public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) { - this.windowFn = windowFn; - } - - /** - * {@inheritDoc} - * - * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}. - */ - @Override - @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior - public Instant assignOutputTime(Instant timestamp, W window) { - return windowFn.getOutputTime(timestamp, window); - } - - @Override - public Instant combine(Instant outputTime, Instant otherOutputTime) { - return Ordering.natural().min(outputTime, otherOutputTime); - } - - /** - * {@inheritDoc} - * - * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn} - * or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends - * only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}. - */ - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java deleted file mode 100644 index 65ccf71..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/package-info.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** - * Defines the {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} transform - * for dividing the elements in a PCollection into windows, and the - * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for controlling when those - * elements are output. - * - * <p>{@code Window} logically divides up or groups the elements of a - * {@link com.google.cloud.dataflow.sdk.values.PCollection} into finite windows according to a - * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}. - * The output of {@code Window} contains the same elements as input, but they - * have been logically assigned to windows. The next - * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}s, including one - * within composite transforms, will group by the combination of keys and - * windows. - * - * <p>Windowing a {@code PCollection} allows chunks of it to be processed - * individually, before the entire {@code PCollection} is available. This is - * especially important for {@code PCollection}s with unbounded size, since the full - * {@code PCollection} is never available at once. - * - * <p>For {@code PCollection}s with a bounded size, by default, all data is implicitly in a - * single window, and this replicates conventional batch mode. However, windowing can still be a - * convenient way to express time-sliced algorithms over bounded {@code PCollection}s. - * - * <p>As elements are assigned to a window, they are are placed into a pane. When the trigger fires - * all of the elements in the current pane are output. - * - * <p>The {@link com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger} will output a - * window when the system watermark passes the end of the window. See - * {@link com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark} for details on the - * watermark. - */ -package com.google.cloud.dataflow.sdk.transforms.windowing; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java deleted file mode 100644 index 69350cb..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ActiveWindowSet.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; - -import java.util.Collection; -import java.util.Set; - -import javax.annotation.Nullable; - -/** - * Track which active windows have their state associated with merged-away windows. - * - * When windows are merged we must track which state previously associated with the merged windows - * must now be associated with the result window. Some of that state may be combined eagerly when - * the windows are merged. The rest is combined lazily when the final state is actually - * required when emitting a pane. We keep track of this using an {@link ActiveWindowSet}. - * - * <p>An {@link ActiveWindowSet} considers a window to be in one of the following states: - * - * <ol> - * <li><b>NEW</b>: The initial state for a window on an incoming element; we do not yet know - * if it should be merged into an ACTIVE window, or whether it is already present as an - * ACTIVE window, since we have not yet called - * {@link WindowFn#mergeWindows}.</li> - * <li><b>ACTIVE</b>: A window that has state associated with it and has not itself been merged - * away. The window may have one or more <i>state address</i> windows under which its - * non-empty state is stored. A state value for an ACTIVE window must be derived by reading - * the state in all of its state address windows.</li> - * <li><b>EPHEMERAL</b>: A NEW window that has been merged into an ACTIVE window before any state - * has been associated with that window. Thus the window is neither ACTIVE nor MERGED. These - * windows are not persistently represented since if they reappear the merge function should - * again redirect them to an ACTIVE window. EPHEMERAL windows are an optimization for - * the common case of in-order events and {@link Sessions session window} by never associating - * state with windows that are created and immediately merged away.</li> - * <li><b>MERGED</b>: An ACTIVE window has been merged into another ACTIVE window after it had - * state associated with it. The window will thus appear as a state address window for exactly - * one ACTIVE window.</li> - * <li><b>EXPIRED</b>: The window has expired and may have been garbage collected. No new elements - * (even late elements) will ever be assigned to that window. These windows are not explicitly - * represented anywhere; it is expected that the user of {@link ActiveWindowSet} will store - * no state associated with the window.</li> - * </ol> - * - * <p> - * - * <p>If no windows will ever be merged we can use the trivial implementation {@link - * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this data structure is in - * {@link MergingActiveWindowSet}. - * - * @param <W> the type of window being managed - */ -public interface ActiveWindowSet<W extends BoundedWindow> { - /** - * Callback for {@link #merge}. - */ - public interface MergeCallback<W extends BoundedWindow> { - /** - * Called when windows are about to be merged, but before any {@link #onMerge} callback - * has been made. - */ - void prefetchOnMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception; - - /** - * Called when windows are about to be merged, after all {@link #prefetchOnMerge} calls - * have been made, but before the active window set has been updated to reflect the merge. - * - * @param toBeMerged the windows about to be merged. - * @param activeToBeMerged the subset of {@code toBeMerged} corresponding to windows which - * are currently ACTIVE (and about to be merged). The remaining windows have been deemed - * EPHEMERAL, and thus have no state associated with them. - * @param mergeResult the result window, either a member of {@code toBeMerged} or new. - */ - void onMerge(Collection<W> toBeMerged, Collection<W> activeToBeMerged, W mergeResult) - throws Exception; - } - - /** - * Remove EPHEMERAL windows since we only need to know about them while processing new elements. - */ - void removeEphemeralWindows(); - - /** - * Save any state changes needed. - */ - void persist(); - - /** - * Return the ACTIVE window into which {@code window} has been merged. - * Return {@code window} itself if it is ACTIVE. Return null if {@code window} has not - * yet been seen. - */ - @Nullable - W representative(W window); - - /** - * Return (a view of) the set of currently ACTIVE windows. - */ - Set<W> getActiveWindows(); - - /** - * Return {@code true} if {@code window} is ACTIVE. - */ - boolean isActive(W window); - - /** - * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it - * as NEW. All NEW windows will be accounted for as ACTIVE, MERGED or EPHEMERAL by a call - * to {@link #merge}. - */ - void addNew(W window); - - /** - * If {@code window} is not already known to be ACTIVE, MERGED or EPHEMERAL then add it - * as ACTIVE. - */ - void addActive(W window); - - /** - * Remove {@code window} from the set. - */ - void remove(W window); - - /** - * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated with this window set, - * merging as many of the active windows as possible. {@code mergeCallback} will be invoked for - * each group of windows that are merged. After this no NEW windows will remain, all merge - * result windows will be ACTIVE, and all windows which have been merged away will not be ACTIVE. - */ - void merge(MergeCallback<W> mergeCallback) throws Exception; - - /** - * Signal that all state in {@link #readStateAddresses} for {@code window} has been merged into - * the {@link #writeStateAddress} for {@code window}. - */ - void merged(W window); - - /** - * Return the state address windows for ACTIVE {@code window} from which all state associated - * should be read and merged. - */ - Set<W> readStateAddresses(W window); - - /** - * Return the state address window of ACTIVE {@code window} into which all new state should be - * written. Always one of the results of {@link #readStateAddresses}. - */ - W writeStateAddress(W window); - - /** - * Return the state address window into which all new state should be written after - * ACTIVE windows {@code toBeMerged} have been merged into {@code mergeResult}. - */ - W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java deleted file mode 100644 index 7a9c877..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ApiSurface.java +++ /dev/null @@ -1,642 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import com.google.common.base.Joiner; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.Sets; -import com.google.common.reflect.ClassPath; -import com.google.common.reflect.ClassPath.ClassInfo; -import com.google.common.reflect.Invokable; -import com.google.common.reflect.Parameter; -import com.google.common.reflect.TypeToken; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.annotation.Annotation; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.GenericArrayType; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.lang.reflect.WildcardType; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -/** - * Represents the API surface of a package prefix. Used for accessing public classes, - * methods, and the types they reference, to control what dependencies are re-exported. - * - * <p>For the purposes of calculating the public API surface, exposure includes any public - * or protected occurrence of: - * - * <ul> - * <li>superclasses - * <li>interfaces implemented - * <li>actual type arguments to generic types - * <li>array component types - * <li>method return types - * <li>method parameter types - * <li>type variable bounds - * <li>wildcard bounds - * </ul> - * - * <p>Exposure is a transitive property. The resulting map excludes primitives - * and array classes themselves. - * - * <p>It is prudent (though not required) to prune prefixes like "java" via the builder - * method {@link #pruningPrefix} to halt the traversal so it does not uselessly catalog references - * that are not interesting. - */ -@SuppressWarnings("rawtypes") -public class ApiSurface { - private static Logger logger = LoggerFactory.getLogger(ApiSurface.class); - - /** - * Returns an empty {@link ApiSurface}. - */ - public static ApiSurface empty() { - logger.debug("Returning an empty ApiSurface"); - return new ApiSurface(Collections.<Class<?>>emptySet(), Collections.<Pattern>emptySet()); - } - - /** - * Returns an {@link ApiSurface} object representing the given package and all subpackages. - */ - public static ApiSurface ofPackage(String packageName) throws IOException { - return ApiSurface.empty().includingPackage(packageName); - } - - /** - * Returns an {@link ApiSurface} object representing just the surface of the given class. - */ - public static ApiSurface ofClass(Class<?> clazz) { - return ApiSurface.empty().includingClass(clazz); - } - - /** - * Returns an {@link ApiSurface} like this one, but also including the named - * package and all of its subpackages. - */ - public ApiSurface includingPackage(String packageName) throws IOException { - ClassPath classPath = ClassPath.from(ClassLoader.getSystemClassLoader()); - - Set<Class<?>> newRootClasses = Sets.newHashSet(); - for (ClassInfo classInfo : classPath.getTopLevelClassesRecursive(packageName)) { - Class clazz = classInfo.load(); - if (exposed(clazz.getModifiers())) { - newRootClasses.add(clazz); - } - } - logger.debug("Including package {} and subpackages: {}", packageName, newRootClasses); - newRootClasses.addAll(rootClasses); - - return new ApiSurface(newRootClasses, patternsToPrune); - } - - /** - * Returns an {@link ApiSurface} like this one, but also including the given class. - */ - public ApiSurface includingClass(Class<?> clazz) { - Set<Class<?>> newRootClasses = Sets.newHashSet(); - logger.debug("Including class {}", clazz); - newRootClasses.add(clazz); - newRootClasses.addAll(rootClasses); - return new ApiSurface(newRootClasses, patternsToPrune); - } - - /** - * Returns an {@link ApiSurface} like this one, but pruning transitive - * references from classes whose full name (including package) begins with the provided prefix. - */ - public ApiSurface pruningPrefix(String prefix) { - return pruningPattern(Pattern.compile(Pattern.quote(prefix) + ".*")); - } - - /** - * Returns an {@link ApiSurface} like this one, but pruning references from the named - * class. - */ - public ApiSurface pruningClassName(String className) { - return pruningPattern(Pattern.compile(Pattern.quote(className))); - } - - /** - * Returns an {@link ApiSurface} like this one, but pruning references from the - * provided class. - */ - public ApiSurface pruningClass(Class<?> clazz) { - return pruningClassName(clazz.getName()); - } - - /** - * Returns an {@link ApiSurface} like this one, but pruning transitive - * references from classes whose full name (including package) begins with the provided prefix. - */ - public ApiSurface pruningPattern(Pattern pattern) { - Set<Pattern> newPatterns = Sets.newHashSet(); - newPatterns.addAll(patternsToPrune); - newPatterns.add(pattern); - return new ApiSurface(rootClasses, newPatterns); - } - - /** - * See {@link #pruningPattern(Pattern)}. - */ - public ApiSurface pruningPattern(String patternString) { - return pruningPattern(Pattern.compile(patternString)); - } - - /** - * Returns all public classes originally belonging to the package - * in the {@link ApiSurface}. - */ - public Set<Class<?>> getRootClasses() { - return rootClasses; - } - - /** - * Returns exposed types in this set, including arrays and primitives as - * specified. - */ - public Set<Class<?>> getExposedClasses() { - return getExposedToExposers().keySet(); - } - - /** - * Returns a path from an exposed class to a root class. There may be many, but this - * gives only one. - * - * <p>If there are only cycles, with no path back to a root class, throws - * IllegalStateException. - */ - public List<Class<?>> getAnyExposurePath(Class<?> exposedClass) { - Set<Class<?>> excluded = Sets.newHashSet(); - excluded.add(exposedClass); - List<Class<?>> path = getAnyExposurePath(exposedClass, excluded); - if (path == null) { - throw new IllegalArgumentException( - "Class " + exposedClass + " has no path back to any root class." - + " It should never have been considered exposed."); - } else { - return path; - } - } - - /** - * Returns a path from an exposed class to a root class. There may be many, but this - * gives only one. It will not return a path that crosses the excluded classes. - * - * <p>If there are only cycles or paths through the excluded classes, returns null. - * - * <p>If the class is not actually in the exposure map, throws IllegalArgumentException - */ - private List<Class<?>> getAnyExposurePath(Class<?> exposedClass, Set<Class<?>> excluded) { - List<Class<?>> exposurePath = Lists.newArrayList(); - exposurePath.add(exposedClass); - - Collection<Class<?>> exposers = getExposedToExposers().get(exposedClass); - if (exposers.isEmpty()) { - throw new IllegalArgumentException("Class " + exposedClass + " is not exposed."); - } - - for (Class<?> exposer : exposers) { - if (excluded.contains(exposer)) { - continue; - } - - // A null exposer means this is already a root class. - if (exposer == null) { - return exposurePath; - } - - List<Class<?>> restOfPath = getAnyExposurePath( - exposer, - Sets.union(excluded, Sets.newHashSet(exposer))); - - if (restOfPath != null) { - exposurePath.addAll(restOfPath); - return exposurePath; - } - } - return null; - } - - //////////////////////////////////////////////////////////////////// - - // Fields initialized upon construction - private final Set<Class<?>> rootClasses; - private final Set<Pattern> patternsToPrune; - - // Fields computed on-demand - private Multimap<Class<?>, Class<?>> exposedToExposers = null; - private Pattern prunedPattern = null; - private Set<Type> visited = null; - - private ApiSurface(Set<Class<?>> rootClasses, Set<Pattern> patternsToPrune) { - this.rootClasses = rootClasses; - this.patternsToPrune = patternsToPrune; - } - - /** - * A map from exposed types to place where they are exposed, in the sense of being a part - * of a public-facing API surface. - * - * <p>This map is the adjencency list representation of a directed graph, where an edge from type - * {@code T1} to type {@code T2} indicates that {@code T2} directly exposes {@code T1} in its API - * surface. - * - * <p>The traversal methods in this class are designed to avoid repeatedly processing types, since - * there will almost always be cyclic references. - */ - private Multimap<Class<?>, Class<?>> getExposedToExposers() { - if (exposedToExposers == null) { - constructExposedToExposers(); - } - return exposedToExposers; - } - - /** - * See {@link #getExposedToExposers}. - */ - private void constructExposedToExposers() { - visited = Sets.newHashSet(); - exposedToExposers = Multimaps.newSetMultimap( - Maps.<Class<?>, Collection<Class<?>>>newHashMap(), - new Supplier<Set<Class<?>>>() { - @Override - public Set<Class<?>> get() { - return Sets.newHashSet(); - } - }); - - for (Class<?> clazz : rootClasses) { - addExposedTypes(clazz, null); - } - } - - /** - * A combined {@code Pattern} that implements all the pruning specified. - */ - private Pattern getPrunedPattern() { - if (prunedPattern == null) { - constructPrunedPattern(); - } - return prunedPattern; - } - - /** - * See {@link #getPrunedPattern}. - */ - private void constructPrunedPattern() { - Set<String> prunedPatternStrings = Sets.newHashSet(); - for (Pattern patternToPrune : patternsToPrune) { - prunedPatternStrings.add(patternToPrune.pattern()); - } - prunedPattern = Pattern.compile("(" + Joiner.on(")|(").join(prunedPatternStrings) + ")"); - } - - /** - * Whether a type and all that it references should be pruned from the graph. - */ - private boolean pruned(Type type) { - return pruned(TypeToken.of(type).getRawType()); - } - - /** - * Whether a class and all that it references should be pruned from the graph. - */ - private boolean pruned(Class<?> clazz) { - return clazz.isPrimitive() - || clazz.isArray() - || getPrunedPattern().matcher(clazz.getName()).matches(); - } - - /** - * Whether a type has already beens sufficiently processed. - */ - private boolean done(Type type) { - return visited.contains(type); - } - - private void recordExposure(Class<?> exposed, Class<?> cause) { - exposedToExposers.put(exposed, cause); - } - - private void recordExposure(Type exposed, Class<?> cause) { - exposedToExposers.put(TypeToken.of(exposed).getRawType(), cause); - } - - private void visit(Type type) { - visited.add(type); - } - - /** - * See {@link #addExposedTypes(Type, Class)}. - */ - private void addExposedTypes(TypeToken type, Class<?> cause) { - logger.debug( - "Adding exposed types from {}, which is the type in type token {}", type.getType(), type); - addExposedTypes(type.getType(), cause); - } - - /** - * Adds any references learned by following a link from {@code cause} to {@code type}. - * This will dispatch according to the concrete {@code Type} implementation. See the - * other overloads of {@code addExposedTypes} for their details. - */ - private void addExposedTypes(Type type, Class<?> cause) { - if (type instanceof TypeVariable) { - logger.debug("Adding exposed types from {}, which is a type variable", type); - addExposedTypes((TypeVariable) type, cause); - } else if (type instanceof WildcardType) { - logger.debug("Adding exposed types from {}, which is a wildcard type", type); - addExposedTypes((WildcardType) type, cause); - } else if (type instanceof GenericArrayType) { - logger.debug("Adding exposed types from {}, which is a generic array type", type); - addExposedTypes((GenericArrayType) type, cause); - } else if (type instanceof ParameterizedType) { - logger.debug("Adding exposed types from {}, which is a parameterized type", type); - addExposedTypes((ParameterizedType) type, cause); - } else if (type instanceof Class) { - logger.debug("Adding exposed types from {}, which is a class", type); - addExposedTypes((Class) type, cause); - } else { - throw new IllegalArgumentException("Unknown implementation of Type"); - } - } - - /** - * Adds any types exposed to this set. These will - * come from the (possibly absent) bounds on the - * type variable. - */ - private void addExposedTypes(TypeVariable type, Class<?> cause) { - if (done(type)) { - return; - } - visit(type); - for (Type bound : type.getBounds()) { - logger.debug("Adding exposed types from {}, which is a type bound on {}", bound, type); - addExposedTypes(bound, cause); - } - } - - /** - * Adds any types exposed to this set. These will come from the (possibly absent) bounds on the - * wildcard. - */ - private void addExposedTypes(WildcardType type, Class<?> cause) { - visit(type); - for (Type lowerBound : type.getLowerBounds()) { - logger.debug( - "Adding exposed types from {}, which is a type lower bound on wildcard type {}", - lowerBound, - type); - addExposedTypes(lowerBound, cause); - } - for (Type upperBound : type.getUpperBounds()) { - logger.debug( - "Adding exposed types from {}, which is a type upper bound on wildcard type {}", - upperBound, - type); - addExposedTypes(upperBound, cause); - } - } - - /** - * Adds any types exposed from the given array type. The array type itself is not added. The - * cause of the exposure of the underlying type is considered whatever type exposed the array - * type. - */ - private void addExposedTypes(GenericArrayType type, Class<?> cause) { - if (done(type)) { - return; - } - visit(type); - logger.debug( - "Adding exposed types from {}, which is the component type on generic array type {}", - type.getGenericComponentType(), - type); - addExposedTypes(type.getGenericComponentType(), cause); - } - - /** - * Adds any types exposed to this set. Even if the - * root type is to be pruned, the actual type arguments - * are processed. - */ - private void addExposedTypes(ParameterizedType type, Class<?> cause) { - // Even if the type is already done, this link to it may be new - boolean alreadyDone = done(type); - if (!pruned(type)) { - visit(type); - recordExposure(type, cause); - } - if (alreadyDone) { - return; - } - - // For a parameterized type, pruning does not take place - // here, only for the raw class. - // The type parameters themselves may not be pruned, - // for example with List<MyApiType> probably the - // standard List is pruned, but MyApiType is not. - logger.debug( - "Adding exposed types from {}, which is the raw type on parameterized type {}", - type.getRawType(), - type); - addExposedTypes(type.getRawType(), cause); - for (Type typeArg : type.getActualTypeArguments()) { - logger.debug( - "Adding exposed types from {}, which is a type argument on parameterized type {}", - typeArg, - type); - addExposedTypes(typeArg, cause); - } - } - - /** - * Adds a class and all of the types it exposes. The cause - * of the class being exposed is given, and the cause - * of everything within the class is that class itself. - */ - private void addExposedTypes(Class<?> clazz, Class<?> cause) { - if (pruned(clazz)) { - return; - } - // Even if `clazz` has been visited, the link from `cause` may be new - boolean alreadyDone = done(clazz); - visit(clazz); - recordExposure(clazz, cause); - if (alreadyDone || pruned(clazz)) { - return; - } - - TypeToken<?> token = TypeToken.of(clazz); - for (TypeToken<?> superType : token.getTypes()) { - if (!superType.equals(token)) { - logger.debug( - "Adding exposed types from {}, which is a super type token on {}", superType, clazz); - addExposedTypes(superType, clazz); - } - } - for (Class innerClass : clazz.getDeclaredClasses()) { - if (exposed(innerClass.getModifiers())) { - logger.debug( - "Adding exposed types from {}, which is an exposed inner class of {}", - innerClass, - clazz); - addExposedTypes(innerClass, clazz); - } - } - for (Field field : clazz.getDeclaredFields()) { - if (exposed(field.getModifiers())) { - logger.debug("Adding exposed types from {}, which is an exposed field on {}", field, clazz); - addExposedTypes(field, clazz); - } - } - for (Invokable invokable : getExposedInvokables(token)) { - logger.debug( - "Adding exposed types from {}, which is an exposed invokable on {}", invokable, clazz); - addExposedTypes(invokable, clazz); - } - } - - private void addExposedTypes(Invokable<?, ?> invokable, Class<?> cause) { - addExposedTypes(invokable.getReturnType(), cause); - for (Annotation annotation : invokable.getAnnotations()) { - logger.debug( - "Adding exposed types from {}, which is an annotation on invokable {}", - annotation, - invokable); - addExposedTypes(annotation.annotationType(), cause); - } - for (Parameter parameter : invokable.getParameters()) { - logger.debug( - "Adding exposed types from {}, which is a parameter on invokable {}", - parameter, - invokable); - addExposedTypes(parameter, cause); - } - for (TypeToken<?> exceptionType : invokable.getExceptionTypes()) { - logger.debug( - "Adding exposed types from {}, which is an exception type on invokable {}", - exceptionType, - invokable); - addExposedTypes(exceptionType, cause); - } - } - - private void addExposedTypes(Parameter parameter, Class<?> cause) { - logger.debug( - "Adding exposed types from {}, which is the type of parameter {}", - parameter.getType(), - parameter); - addExposedTypes(parameter.getType(), cause); - for (Annotation annotation : parameter.getAnnotations()) { - logger.debug( - "Adding exposed types from {}, which is an annotation on parameter {}", - annotation, - parameter); - addExposedTypes(annotation.annotationType(), cause); - } - } - - private void addExposedTypes(Field field, Class<?> cause) { - addExposedTypes(field.getGenericType(), cause); - for (Annotation annotation : field.getDeclaredAnnotations()) { - logger.debug( - "Adding exposed types from {}, which is an annotation on field {}", annotation, field); - addExposedTypes(annotation.annotationType(), cause); - } - } - - /** - * Returns an {@link Invokable} for each public methods or constructors of a type. - */ - private Set<Invokable> getExposedInvokables(TypeToken<?> type) { - Set<Invokable> invokables = Sets.newHashSet(); - - for (Constructor constructor : type.getRawType().getConstructors()) { - if (0 != (constructor.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) { - invokables.add(type.constructor(constructor)); - } - } - - for (Method method : type.getRawType().getMethods()) { - if (0 != (method.getModifiers() & (Modifier.PUBLIC | Modifier.PROTECTED))) { - invokables.add(type.method(method)); - } - } - - return invokables; - } - - /** - * Returns true of the given modifier bitmap indicates exposure (public or protected access). - */ - private boolean exposed(int modifiers) { - return 0 != (modifiers & (Modifier.PUBLIC | Modifier.PROTECTED)); - } - - - //////////////////////////////////////////////////////////////////////////// - - public static ApiSurface getSdkApiSurface() throws IOException { - return ApiSurface.ofPackage("com.google.cloud.dataflow") - .pruningPattern("com[.]google[.]cloud[.]dataflow.*Test") - .pruningPattern("com[.]google[.]cloud[.]dataflow.*Benchmark") - .pruningPrefix("com.google.cloud.dataflow.integration") - .pruningPrefix("java") - .pruningPrefix("com.google.api") - .pruningPrefix("com.google.auth") - .pruningPrefix("com.google.bigtable.v1") - .pruningPrefix("com.google.cloud.bigtable.config") - .pruningPrefix("com.google.cloud.bigtable.grpc.Bigtable*Name") - .pruningPrefix("com.google.protobuf") - .pruningPrefix("org.joda.time") - .pruningPrefix("org.apache.avro") - .pruningPrefix("org.junit") - .pruningPrefix("com.fasterxml.jackson.annotation"); - } - - public static void main(String[] args) throws Exception { - List<String> names = Lists.newArrayList(); - for (Class clazz : getSdkApiSurface().getExposedClasses()) { - names.add(clazz.getName()); - } - List<String> sortedNames = Lists.newArrayList(names); - Collections.sort(sortedNames); - - for (String name : sortedNames) { - System.out.println(name); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java deleted file mode 100644 index c7fe4b4..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppEngineEnvironment.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import java.lang.reflect.InvocationTargetException; - -/** Stores whether we are running within AppEngine or not. */ -public class AppEngineEnvironment { - /** - * True if running inside of AppEngine, false otherwise. - */ - @Deprecated - public static final boolean IS_APP_ENGINE = isAppEngine(); - - /** - * Attempts to detect whether we are inside of AppEngine. - * - * <p>Purposely copied and left private from private <a href="https://code.google.com/p/ - * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/ - * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>. - * - * @return true if we are inside of AppEngine, false otherwise. - */ - static boolean isAppEngine() { - if (System.getProperty("com.google.appengine.runtime.environment") == null) { - return false; - } - try { - // If the current environment is null, we're not inside AppEngine. - return Class.forName("com.google.apphosting.api.ApiProxy") - .getMethod("getCurrentEnvironment") - .invoke(null) != null; - } catch (ClassNotFoundException e) { - // If ApiProxy doesn't exist, we're not on AppEngine at all. - return false; - } catch (InvocationTargetException e) { - // If ApiProxy throws an exception, we're not in a proper AppEngine environment. - return false; - } catch (IllegalAccessException e) { - // If the method isn't accessible, we're not on a supported version of AppEngine; - return false; - } catch (NoSuchMethodException e) { - // If the method doesn't exist, we're not on a supported version of AppEngine; - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java deleted file mode 100644 index 512d72d..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AppliedCombineFn.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.common.annotations.VisibleForTesting; - -import java.io.Serializable; - -/** - * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a - * specific application of the {@link KeyedCombineFnWithContext}. - * - * <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>} - * may depend on the {@code Coder<InputT>}. - * - * @param <K> type of keys - * @param <InputT> type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ -public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable { - - private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn; - private final Coder<AccumT> accumulatorCoder; - - private final Iterable<PCollectionView<?>> sideInputViews; - private final KvCoder<K, InputT> kvCoder; - private final WindowingStrategy<?, ?> windowingStrategy; - - private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn, - Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews, - KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { - this.fn = fn; - this.accumulatorCoder = accumulatorCoder; - this.sideInputViews = sideInputViews; - this.kvCoder = kvCoder; - this.windowingStrategy = windowingStrategy; - } - - public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withAccumulatorCoder( - PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, - Coder<AccumT> accumCoder) { - return withAccumulatorCoder(fn, accumCoder, null, null, null); - } - - public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withAccumulatorCoder( - PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, - Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews, - KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { - // Casting down the K and InputT is safe because they're only used as inputs. - @SuppressWarnings("unchecked") - PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn = - (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn); - return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy); - } - - @VisibleForTesting - public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, - CoderRegistry registry, KvCoder<K, InputT> kvCoder) { - return withInputCoder(fn, registry, kvCoder, null, null); - } - - public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, - CoderRegistry registry, KvCoder<K, InputT> kvCoder, - Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) { - // Casting down the K and InputT is safe because they're only used as inputs. - @SuppressWarnings("unchecked") - PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn = - (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn); - try { - Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder( - registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder()); - return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException("Could not determine coder for accumulator", e); - } - } - - private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create( - PerKeyCombineFn<K, InputT, AccumT, OutputT> fn, - Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews, - KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { - return new AppliedCombineFn<>( - fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy); - } - - public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() { - return fn; - } - - public Iterable<PCollectionView<?>> getSideInputViews() { - return sideInputViews; - } - - public Coder<AccumT> getAccumulatorCoder() { - return accumulatorCoder; - } - - public KvCoder<K, InputT> getKvCoder() { - return kvCoder; - } - - public WindowingStrategy<?, ?> getWindowingStrategy() { - return windowingStrategy; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java deleted file mode 100644 index ca59c53..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindowsDoFn.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; - -import org.joda.time.Instant; - -import java.util.Collection; - -/** - * {@link DoFn} that tags elements of a PCollection with windows, according - * to the provided {@link WindowFn}. - * @param <T> Type of elements being windowed - * @param <W> Window type - */ -@SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> { - private WindowFn<? super T, W> fn; - - public AssignWindowsDoFn(WindowFn<? super T, W> fn) { - this.fn = fn; - } - - @Override - @SuppressWarnings("unchecked") - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = - ((WindowFn<T, W>) fn).assignWindows( - ((WindowFn<T, W>) fn).new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return c.windowingInternals().windows(); - } - }); - - c.windowingInternals() - .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java deleted file mode 100644 index e94d414..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.NanoClock; -import com.google.common.base.Preconditions; - -import java.util.concurrent.TimeUnit; - -/** - * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff - * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff - * unless the time interval has expired since the object was created. At this point, it will always - * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts, - * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the - * constructor. - * - * <p>Implementation is not thread-safe. - */ -public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff { - private long endTimeMillis; - private long maximumTotalWaitTimeMillis; - private ResetPolicy resetPolicy; - private final NanoClock nanoClock; - // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns. Here, we choose 2^53 ns as - // a smaller but still huge limit. - private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53; - - /** - * A ResetPolicy controls the behavior of this BackOff when reset() is called. By default, both - * the number of attempts and the time bound for the BackOff are reset, but an alternative - * ResetPolicy may be set to only reset one of these two. - */ - public static enum ResetPolicy { - ALL, - ATTEMPTS, - TIMER - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) { - this( - maximumNumberOfAttempts, - initialIntervalMillis, - maximumTotalWaitTimeMillis, - ResetPolicy.ALL, - NanoClock.SYSTEM); - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject - * to being reset. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, - long initialIntervalMillis, - long maximumTotalWaitTimeMillis, - ResetPolicy resetPolicy) { - this( - maximumNumberOfAttempts, - initialIntervalMillis, - maximumTotalWaitTimeMillis, - resetPolicy, - NanoClock.SYSTEM); - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject - * to being reset. - * @param nanoClock clock used to measure the time that has passed. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, - long initialIntervalMillis, - long maximumTotalWaitTimeMillis, - ResetPolicy resetPolicy, - NanoClock nanoClock) { - super(maximumNumberOfAttempts, initialIntervalMillis); - Preconditions.checkArgument( - maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero."); - Preconditions.checkArgument( - maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS, - "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds"); - Preconditions.checkArgument(resetPolicy != null, "resetPolicy may not be null"); - Preconditions.checkArgument(nanoClock != null, "nanoClock may not be null"); - this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis; - this.resetPolicy = resetPolicy; - this.nanoClock = nanoClock; - // Set the end time for this BackOff. Note that we cannot simply call reset() here since the - // resetPolicy may not be set to reset the time bound. - endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis; - } - - @Override - public void reset() { - // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are - // set. In this case, we call the parent class's reset() method and return. - if (resetPolicy == null) { - super.reset(); - return; - } - // Reset the number of attempts. - if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) { - super.reset(); - } - // Reset the time bound. - if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) { - endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis; - } - } - - public void setEndtimeMillis(long endTimeMillis) { - this.endTimeMillis = endTimeMillis; - } - - @Override - public long nextBackOffMillis() { - if (atMaxAttempts()) { - return BackOff.STOP; - } - long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis()); - return (backoff > 0 ? backoff : BackOff.STOP); - } - - private long getTimeMillis() { - return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime()); - } - - @Override - public boolean atMaxAttempts() { - return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java deleted file mode 100644 index 613316e..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AttemptBoundedExponentialBackOff.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import com.google.api.client.util.BackOff; -import com.google.common.base.Preconditions; - -/** - * Implementation of {@link BackOff} that increases the back off period for each retry attempt - * using a randomization function that grows exponentially. - * - * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10. - * For 10 tries the sequence will be (values in seconds): - * - * <pre> - * retry# retry_interval randomized_interval - * 1 0.5 [0.25, 0.75] - * 2 0.75 [0.375, 1.125] - * 3 1.125 [0.562, 1.687] - * 4 1.687 [0.8435, 2.53] - * 5 2.53 [1.265, 3.795] - * 6 3.795 [1.897, 5.692] - * 7 5.692 [2.846, 8.538] - * 8 8.538 [4.269, 12.807] - * 9 12.807 [6.403, 19.210] - * 10 {@link BackOff#STOP} - * </pre> - * - * <p>Implementation is not thread-safe. - */ -public class AttemptBoundedExponentialBackOff implements BackOff { - public static final double DEFAULT_MULTIPLIER = 1.5; - public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; - private final int maximumNumberOfAttempts; - private final long initialIntervalMillis; - private int currentAttempt; - - public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) { - Preconditions.checkArgument(maximumNumberOfAttempts > 0, - "Maximum number of attempts must be greater than zero."); - Preconditions.checkArgument(initialIntervalMillis > 0, - "Initial interval must be greater than zero."); - this.maximumNumberOfAttempts = maximumNumberOfAttempts; - this.initialIntervalMillis = initialIntervalMillis; - reset(); - } - - @Override - public void reset() { - currentAttempt = 1; - } - - @Override - public long nextBackOffMillis() { - if (currentAttempt >= maximumNumberOfAttempts) { - return BackOff.STOP; - } - double currentIntervalMillis = initialIntervalMillis - * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1); - double randomOffset = (Math.random() * 2 - 1) - * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; - currentAttempt += 1; - return Math.round(currentIntervalMillis + randomOffset); - } - - public boolean atMaxAttempts() { - return currentAttempt >= maximumNumberOfAttempts; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java deleted file mode 100644 index c3a4861..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/AvroUtils.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import static com.google.common.base.MoreObjects.firstNonNull; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Verify.verify; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.file.DataFileConstants; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DecoderFactory; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.util.Arrays; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * A set of utilities for working with Avro files. - * - * <p>These utilities are based on the <a - * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification. - */ -public class AvroUtils { - - /** - * Avro file metadata. - */ - public static class AvroMetadata { - private byte[] syncMarker; - private String codec; - private String schemaString; - - AvroMetadata(byte[] syncMarker, String codec, String schemaString) { - this.syncMarker = syncMarker; - this.codec = codec; - this.schemaString = schemaString; - } - - /** - * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a> - * string for the file. - */ - public String getSchemaString() { - return schemaString; - } - - /** - * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the - * file. - */ - public String getCodec() { - return codec; - } - - /** - * The 16-byte sync marker for the file. See the documentation for - * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object - * Container File</a> for more information. - */ - public byte[] getSyncMarker() { - return syncMarker; - } - } - - /** - * Reads the {@link AvroMetadata} from the header of an Avro file. - * - * <p>This method parses the header of an Avro - * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files"> - * Object Container File</a>. - * - * @throws IOException if the file is an invalid format. - */ - public static AvroMetadata readMetadataFromFile(String fileName) throws IOException { - String codec = null; - String schemaString = null; - byte[] syncMarker; - try (InputStream stream = - Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName))) { - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); - - // The header of an object container file begins with a four-byte magic number, followed - // by the file metadata (including the schema and codec), encoded as a map. Finally, the - // header ends with the file's 16-byte sync marker. - // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on - // the encoding of container files. - - // Read the magic number. - byte[] magic = new byte[DataFileConstants.MAGIC.length]; - decoder.readFixed(magic); - if (!Arrays.equals(magic, DataFileConstants.MAGIC)) { - throw new IOException("Missing Avro file signature: " + fileName); - } - - // Read the metadata to find the codec and schema. - ByteBuffer valueBuffer = ByteBuffer.allocate(512); - long numRecords = decoder.readMapStart(); - while (numRecords > 0) { - for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) { - String key = decoder.readString(); - // readBytes() clears the buffer and returns a buffer where: - // - position is the start of the bytes read - // - limit is the end of the bytes read - valueBuffer = decoder.readBytes(valueBuffer); - byte[] bytes = new byte[valueBuffer.remaining()]; - valueBuffer.get(bytes); - if (key.equals(DataFileConstants.CODEC)) { - codec = new String(bytes, "UTF-8"); - } else if (key.equals(DataFileConstants.SCHEMA)) { - schemaString = new String(bytes, "UTF-8"); - } - } - numRecords = decoder.mapNext(); - } - if (codec == null) { - codec = DataFileConstants.NULL_CODEC; - } - - // Finally, read the sync marker. - syncMarker = new byte[DataFileConstants.SYNC_SIZE]; - decoder.readFixed(syncMarker); - } - return new AvroMetadata(syncMarker, codec, schemaString); - } - - /** - * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and - * immutable. - */ - private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC(); - // Package private for BigQueryTableRowIterator to use. - static String formatTimestamp(String timestamp) { - // timestamp is in "seconds since epoch" format, with scientific notation. - // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC". - // Separate into seconds and microseconds. - double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000; - long timestampMicros = (long) timestampDoubleMicros; - long seconds = timestampMicros / 1000000; - int micros = (int) (timestampMicros % 1000000); - String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000); - - // No sub-second component. - if (micros == 0) { - return String.format("%s UTC", dayAndTime); - } - - // Sub-second component. - int digits = 6; - int subsecond = micros; - while (subsecond % 10 == 0) { - digits--; - subsecond /= 10; - } - String formatString = String.format("%%0%dd", digits); - String fractionalSeconds = String.format(formatString, subsecond); - return String.format("%s.%s UTC", dayAndTime, fractionalSeconds); - } - - /** - * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. - * - * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config"> - * "Avro format"</a> for more information. - */ - public static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return convertGenericRecordToTableRow(record, schema.getFields()); - } - - private static TableRow convertGenericRecordToTableRow( - GenericRecord record, List<TableFieldSchema> fields) { - TableRow row = new TableRow(); - for (TableFieldSchema subSchema : fields) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field - // is required, so it may not be null. - Field field = record.getSchema().getField(subSchema.getName()); - Object convertedValue = - getTypedCellValue(field.schema(), subSchema, record.get(field.name())); - if (convertedValue != null) { - // To match the JSON files exported by BigQuery, do not include null values in the output. - row.set(field.name(), convertedValue); - } - } - return row; - } - - @Nullable - private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field - // is optional (and so it may be null), but defaults to "NULLABLE". - String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE"); - switch (mode) { - case "REQUIRED": - return convertRequiredField(schema.getType(), fieldSchema, v); - case "REPEATED": - return convertRepeatedField(schema, fieldSchema, v); - case "NULLABLE": - return convertNullableField(schema, fieldSchema, v); - default: - throw new UnsupportedOperationException( - "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode()); - } - } - - private static List<Object> convertRepeatedField( - Schema schema, TableFieldSchema fieldSchema, Object v) { - Type arrayType = schema.getType(); - verify( - arrayType == Type.ARRAY, - "BigQuery REPEATED field %s should be Avro ARRAY, not %s", - fieldSchema.getName(), - arrayType); - // REPEATED fields are represented as Avro arrays. - if (v == null) { - // Handle the case of an empty repeated field. - return ImmutableList.of(); - } - @SuppressWarnings("unchecked") - List<Object> elements = (List<Object>) v; - ImmutableList.Builder<Object> values = ImmutableList.builder(); - Type elementType = schema.getElementType().getType(); - for (Object element : elements) { - values.add(convertRequiredField(elementType, fieldSchema, element)); - } - return values.build(); - } - - private static Object convertRequiredField( - Type avroType, TableFieldSchema fieldSchema, Object v) { - // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery - // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - ImmutableMap<String, Type> fieldMap = - ImmutableMap.<String, Type>builder() - .put("STRING", Type.STRING) - .put("INTEGER", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("BOOLEAN", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .build(); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - Type expectedAvroType = fieldMap.get(bqType); - verify( - avroType == expectedAvroType, - "Expected Avro schema type %s, not %s, for BigQuery %s field %s", - expectedAvroType, - avroType, - bqType, - fieldSchema.getName()); - switch (fieldSchema.getType()) { - case "STRING": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "INTEGER": - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return ((Long) v).toString(); - case "FLOAT": - verify(v instanceof Double, "Expected Double, got %s", v.getClass()); - return v; - case "BOOLEAN": - verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass()); - return v; - case "TIMESTAMP": - // TIMESTAMP data types are represented as Avro LONG types. They are converted back to - // Strings with variable-precision (up to six digits) to match the JSON files export - // by BigQuery. - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - Double doubleValue = ((Long) v) / 1000000.0; - return formatTimestamp(doubleValue.toString()); - case "RECORD": - verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass()); - return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields()); - default: - throw new UnsupportedOperationException( - String.format( - "Unexpected BigQuery field schema type %s for field named %s", - fieldSchema.getType(), - fieldSchema.getName())); - } - } - - @Nullable - private static Object convertNullableField( - Schema avroSchema, TableFieldSchema fieldSchema, Object v) { - // NULLABLE fields are represented as an Avro Union of the corresponding type and "null". - verify( - avroSchema.getType() == Type.UNION, - "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s", - avroSchema.getType(), - fieldSchema.getName()); - List<Schema> unionTypes = avroSchema.getTypes(); - verify( - unionTypes.size() == 2, - "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s", - fieldSchema.getName(), - unionTypes); - - if (v == null) { - return null; - } - - Type firstType = unionTypes.get(0).getType(); - if (!firstType.equals(Type.NULL)) { - return convertRequiredField(firstType, fieldSchema, v); - } - return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java deleted file mode 100644 index 6a0ccf3..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BaseExecutionContext.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; -import com.google.cloud.dataflow.sdk.values.TupleTag; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * Base class for implementations of {@link ExecutionContext}. - * - * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate - * {@link StepContext} implementation. Any {@code StepContext} created will - * be cached for the lifetime of this {@link ExecutionContext}. - * - * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link StepContext} from {@link #getOrCreateStepContext(String, String, StateSampler)} and - * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. - * <pre> - * @Override - * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) { - * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...); - * } - * </pre> - * - * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of - * {@link #createStepContext(String, String, StateSampler)}, - * {@link #getOrCreateStepContext(String, String, StateSampler}, and {@link #getAllStepContexts()} - * will be appropriately specialized. - */ -public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext> - implements ExecutionContext { - - private Map<String, T> cachedStepContexts = new HashMap<>(); - - /** - * Implementations should override this to create the specific type - * of {@link StepContext} they need. - */ - protected abstract T createStepContext( - String stepName, String transformName, StateSampler stateSampler); - - - /** - * Returns the {@link StepContext} associated with the given step. - */ - @Override - public T getOrCreateStepContext( - String stepName, String transformName, StateSampler stateSampler) { - T context = cachedStepContexts.get(stepName); - if (context == null) { - context = createStepContext(stepName, transformName, stateSampler); - cachedStepContexts.put(stepName, context); - } - return context; - } - - /** - * Returns a collection view of all of the {@link StepContext}s. - */ - @Override - public Collection<? extends T> getAllStepContexts() { - return Collections.unmodifiableCollection(cachedStepContexts.values()); - } - - /** - * Hook for subclasses to implement that will be called whenever - * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#output} - * is called. - */ - @Override - public void noteOutput(WindowedValue<?> output) {} - - /** - * Hook for subclasses to implement that will be called whenever - * {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context#sideOutput} - * is called. - */ - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {} - - /** - * Base class for implementations of {@link ExecutionContext.StepContext}. - * - * <p>To complete a concrete subclass, implement {@link #timerInternals} and - * {@link #stateInternals}. - */ - public abstract static class StepContext implements ExecutionContext.StepContext { - private final ExecutionContext executionContext; - private final String stepName; - private final String transformName; - - public StepContext(ExecutionContext executionContext, String stepName, String transformName) { - this.executionContext = executionContext; - this.stepName = stepName; - this.transformName = transformName; - } - - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - - @Override - public void noteOutput(WindowedValue<?> output) { - executionContext.noteOutput(output); - } - - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { - executionContext.noteSideOutput(tag, output); - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, Coder<W> windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public abstract StateInternals<?> stateInternals(); - - @Override - public abstract TimerInternals timerInternals(); - } -}
