http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java new file mode 100644 index 0000000..91e34ac --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -0,0 +1,3229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName; +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; +import org.apache.beam.runners.dataflow.internal.AssignWindows; +import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; +import org.apache.beam.runners.dataflow.internal.IsmFormat; +import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; +import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; +import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; +import org.apache.beam.runners.dataflow.internal.ReadTranslator; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.runners.dataflow.util.DataflowTransport; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.io.PubsubUnboundedSink; +import org.apache.beam.sdk.io.PubsubUnboundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.ShardNameTemplate; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.PathValidator; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.ValueWithRecordId; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.clouddebugger.v2.model.Debuggee; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.dataflow.model.WorkerPool; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Utf8; +import com.google.common.collect.ForwardingMap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.DateTimeUtils; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.format.DateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to the Dataflow representation + * using the {@link DataflowPipelineTranslator} and then submitting + * them to a Dataflow service for execution. + * + * <p><h3>Permissions</h3> + * When reading from a Dataflow source or writing to a Dataflow sink using + * {@code DataflowRunner}, the Google cloudservices account and the Google compute engine + * service account of the GCP project running the Dataflow Job will need access to the corresponding + * source/sink. + * + * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud + * Dataflow Security and Permissions</a> for more details. + */ +public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { + private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class); + + /** Provided configuration options. */ + private final DataflowPipelineOptions options; + + /** Client for the Dataflow service. This is used to actually submit jobs. */ + private final Dataflow dataflowClient; + + /** Translator for this DataflowRunner, based on options. */ + private final DataflowPipelineTranslator translator; + + /** Custom transforms implementations. */ + private final Map<Class<?>, Class<?>> overrides; + + /** A set of user defined functions to invoke at different points in execution. */ + private DataflowRunnerHooks hooks; + + // Environment version information. + private static final String ENVIRONMENT_MAJOR_VERSION = "5"; + + // Default Docker container images that execute Dataflow worker harness, residing in Google + // Container Registry, separately for Batch and Streaming. + public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE + = "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160613"; + public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE + = "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160613"; + + // The limit of CreateJob request size. + private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024; + + private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat; + + /** + * Project IDs must contain lowercase letters, digits, or dashes. + * IDs must start with a letter and may not end with a dash. + * This regex isn't exact - this allows for patterns that would be rejected by + * the service, but this is sufficient for basic validation of project IDs. + */ + public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]"; + + /** + * Construct a runner from the provided options. + * + * @param options Properties that configure the runner. + * @return The newly created runner. + */ + public static DataflowRunner fromOptions(PipelineOptions options) { + // (Re-)register standard IO factories. Clobbers any prior credentials. + IOChannelUtils.registerStandardIOFactories(options); + + DataflowPipelineOptions dataflowOptions = + PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options); + ArrayList<String> missing = new ArrayList<>(); + + if (dataflowOptions.getAppName() == null) { + missing.add("appName"); + } + if (missing.size() > 0) { + throw new IllegalArgumentException( + "Missing required values: " + Joiner.on(',').join(missing)); + } + + PathValidator validator = dataflowOptions.getPathValidator(); + Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation()) + && Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())), + "Missing required value: at least one of tempLocation or stagingLocation must be set."); + + if (dataflowOptions.getStagingLocation() != null) { + validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); + } + if (dataflowOptions.getTempLocation() != null) { + validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation()); + } + if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) { + dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation()); + } else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) { + try { + dataflowOptions.setStagingLocation( + IOChannelUtils.resolve(dataflowOptions.getTempLocation(), "staging")); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to resolve PipelineOptions.stagingLocation " + + "from PipelineOptions.tempLocation. Please set the staging location explicitly.", e); + } + } + + if (dataflowOptions.getFilesToStage() == null) { + dataflowOptions.setFilesToStage(detectClassPathResourcesToStage( + DataflowRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + dataflowOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage()); + } + + // Verify jobName according to service requirements, truncating converting to lowercase if + // necessary. + String jobName = + dataflowOptions + .getJobName() + .toLowerCase(); + checkArgument( + jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), + "JobName invalid; the name must consist of only the characters " + + "[-a-z0-9], starting with a letter and ending with a letter " + + "or number"); + if (!jobName.equals(dataflowOptions.getJobName())) { + LOG.info( + "PipelineOptions.jobName did not match the service requirements. " + + "Using {} instead of {}.", + jobName, + dataflowOptions.getJobName()); + } + dataflowOptions.setJobName(jobName); + + // Verify project + String project = dataflowOptions.getProject(); + if (project.matches("[0-9]*")) { + throw new IllegalArgumentException("Project ID '" + project + + "' invalid. Please make sure you specified the Project ID, not project number."); + } else if (!project.matches(PROJECT_ID_REGEXP)) { + throw new IllegalArgumentException("Project ID '" + project + + "' invalid. Please make sure you specified the Project ID, not project description."); + } + + DataflowPipelineDebugOptions debugOptions = + dataflowOptions.as(DataflowPipelineDebugOptions.class); + // Verify the number of worker threads is a valid value + if (debugOptions.getNumberOfWorkerHarnessThreads() < 0) { + throw new IllegalArgumentException("Number of worker harness threads '" + + debugOptions.getNumberOfWorkerHarnessThreads() + + "' invalid. Please make sure the value is non-negative."); + } + + return new DataflowRunner(dataflowOptions); + } + + @VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) { + this.options = options; + this.dataflowClient = options.getDataflowClient(); + this.translator = DataflowPipelineTranslator.fromOptions(options); + this.pcollectionsRequiringIndexedFormat = new HashSet<>(); + this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); + + ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder(); + if (options.isStreaming()) { + builder.put(Combine.GloballyAsSingletonView.class, + StreamingCombineGloballyAsSingletonView.class); + builder.put(Create.Values.class, StreamingCreate.class); + builder.put(View.AsMap.class, StreamingViewAsMap.class); + builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class); + builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class); + builder.put(View.AsList.class, StreamingViewAsList.class); + builder.put(View.AsIterable.class, StreamingViewAsIterable.class); + builder.put(Write.Bound.class, StreamingWrite.class); + builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); + builder.put(Read.Bounded.class, UnsupportedIO.class); + builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class); + builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class); + builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class); + builder.put(TextIO.Read.Bound.class, UnsupportedIO.class); + builder.put(TextIO.Write.Bound.class, UnsupportedIO.class); + builder.put(Window.Bound.class, AssignWindows.class); + // In streaming mode must use either the custom Pubsub unbounded source/sink or + // defer to Windmill's built-in implementation. + builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class); + builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class); + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_source")) { + builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class); + } + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_pubsub_sink")) { + builder.put(PubsubUnboundedSink.class, StreamingPubsubIOWrite.class); + } + } else { + builder.put(Read.Unbounded.class, UnsupportedIO.class); + builder.put(Window.Bound.class, AssignWindows.class); + builder.put(Write.Bound.class, BatchWrite.class); + builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); + builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class); + // In batch mode must use the custom Pubsub bounded source/sink. + builder.put(PubsubUnboundedSource.class, UnsupportedIO.class); + builder.put(PubsubUnboundedSink.class, UnsupportedIO.class); + if (options.getExperiments() == null + || !options.getExperiments().contains("disable_ism_side_input")) { + builder.put(View.AsMap.class, BatchViewAsMap.class); + builder.put(View.AsMultimap.class, BatchViewAsMultimap.class); + builder.put(View.AsSingleton.class, BatchViewAsSingleton.class); + builder.put(View.AsList.class, BatchViewAsList.class); + builder.put(View.AsIterable.class, BatchViewAsIterable.class); + } + } + overrides = builder.build(); + } + + /** + * Applies the given transform to the input. For transforms with customized definitions + * for the Dataflow pipeline runner, the application is intercepted and modified here. + */ + @Override + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + + if (Combine.GroupedValues.class.equals(transform.getClass()) + || GroupByKey.class.equals(transform.getClass())) { + + // For both Dataflow runners (streaming and batch), GroupByKey and GroupedValues are + // primitives. Returning a primitive output instead of the expanded definition + // signals to the translator that translation is necessary. + @SuppressWarnings("unchecked") + PCollection<?> pc = (PCollection<?>) input; + @SuppressWarnings("unchecked") + OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal( + pc.getPipeline(), + transform instanceof GroupByKey + ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy()) + : pc.getWindowingStrategy(), + pc.isBounded()); + return outputT; + } else if (Window.Bound.class.equals(transform.getClass())) { + /* + * TODO: make this the generic way overrides are applied (using super.apply() rather than + * Pipeline.applyTransform(); this allows the apply method to be replaced without inserting + * additional nodes into the graph. + */ + // casting to wildcard + @SuppressWarnings("unchecked") + OutputT windowed = (OutputT) applyWindow((Window.Bound<?>) transform, (PCollection<?>) input); + return windowed; + } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) + && ((PCollectionList<?>) input).size() == 0) { + return (OutputT) Pipeline.applyTransform(input, Create.of()); + } else if (overrides.containsKey(transform.getClass())) { + // It is the responsibility of whoever constructs overrides to ensure this is type safe. + @SuppressWarnings("unchecked") + Class<PTransform<InputT, OutputT>> transformClass = + (Class<PTransform<InputT, OutputT>>) transform.getClass(); + + @SuppressWarnings("unchecked") + Class<PTransform<InputT, OutputT>> customTransformClass = + (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass()); + + PTransform<InputT, OutputT> customTransform = + InstanceBuilder.ofType(customTransformClass) + .withArg(DataflowRunner.class, this) + .withArg(transformClass, transform) + .build(); + + return Pipeline.applyTransform(input, customTransform); + } else { + return super.apply(transform, input); + } + } + + private <T> PCollection<T> applyWindow( + Window.Bound<?> intitialTransform, PCollection<?> initialInput) { + // types are matched at compile time + @SuppressWarnings("unchecked") + Window.Bound<T> transform = (Window.Bound<T>) intitialTransform; + @SuppressWarnings("unchecked") + PCollection<T> input = (PCollection<T>) initialInput; + return super.apply(new AssignWindows<>(transform), input); + } + + private String debuggerMessage(String projectId, String uniquifier) { + return String.format("To debug your job, visit Google Cloud Debugger at: " + + "https://console.developers.google.com/debug?project=%s&dbgee=%s", + projectId, uniquifier); + } + + private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) { + if (!options.getEnableCloudDebugger()) { + return; + } + + if (options.getDebuggee() != null) { + throw new RuntimeException("Should not specify the debuggee"); + } + + Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build(); + Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); + options.setDebuggee(debuggee); + + System.out.println(debuggerMessage(options.getProject(), debuggee.getUniquifier())); + } + + private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) { + RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest(); + registerReq.setDebuggee(new Debuggee() + .setProject(options.getProject()) + .setUniquifier(uniquifier) + .setDescription(uniquifier) + .setAgentVersion("google.com/cloud-dataflow-java/v1")); + + try { + RegisterDebuggeeResponse registerResponse = + debuggerClient.controller().debuggees().register(registerReq).execute(); + Debuggee debuggee = registerResponse.getDebuggee(); + if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) { + throw new RuntimeException("Unable to register with the debugger: " + + debuggee.getStatus().getDescription().getFormat()); + } + + return debuggee; + } catch (IOException e) { + throw new RuntimeException("Unable to register with the debugger: ", e); + } + } + + @Override + public DataflowPipelineJob run(Pipeline pipeline) { + logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); + + LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications " + + "related to Google Compute Engine usage and other Google Cloud Services."); + + List<DataflowPackage> packages = options.getStager().stageFiles(); + + + // Set a unique client_request_id in the CreateJob request. + // This is used to ensure idempotence of job creation across retried + // attempts to create a job. Specifically, if the service returns a job with + // a different client_request_id, it means the returned one is a different + // job previously created with the same job name, and that the job creation + // has been effectively rejected. The SDK should return + // Error::Already_Exists to user in that case. + int randomNum = new Random().nextInt(9000) + 1000; + String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC) + .print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum; + + // Try to create a debuggee ID. This must happen before the job is translated since it may + // update the options. + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + maybeRegisterDebuggee(dataflowOptions, requestId); + + JobSpecification jobSpecification = + translator.translate(pipeline, this, packages); + Job newJob = jobSpecification.getJob(); + newJob.setClientRequestId(requestId); + + String version = ReleaseInfo.getReleaseInfo().getVersion(); + System.out.println("Dataflow SDK version: " + version); + + newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo()); + // The Dataflow Service may write to the temporary directory directly, so + // must be verified. + if (!Strings.isNullOrEmpty(options.getTempLocation())) { + newJob.getEnvironment().setTempStoragePrefix( + dataflowOptions.getPathValidator().verifyPath(options.getTempLocation())); + } + newJob.getEnvironment().setDataset(options.getTempDatasetId()); + newJob.getEnvironment().setExperiments(options.getExperiments()); + + // Set the Docker container image that executes Dataflow worker harness, residing in Google + // Container Registry. Translator is guaranteed to create a worker pool prior to this point. + String workerHarnessContainerImage = + options.as(DataflowPipelineWorkerPoolOptions.class) + .getWorkerHarnessContainerImage(); + for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { + workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); + } + + // Requirements about the service. + Map<String, Object> environmentVersion = new HashMap<>(); + environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, ENVIRONMENT_MAJOR_VERSION); + newJob.getEnvironment().setVersion(environmentVersion); + // Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can + // autoscale if specified. + String jobType = "JAVA_BATCH_AUTOSCALING"; + + if (options.isStreaming()) { + jobType = "STREAMING"; + } + environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType); + + if (hooks != null) { + hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); + } + + if (!Strings.isNullOrEmpty(options.getDataflowJobFile())) { + try (PrintWriter printWriter = new PrintWriter( + new File(options.getDataflowJobFile()))) { + String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); + printWriter.print(workSpecJson); + LOG.info("Printed workflow specification to {}", options.getDataflowJobFile()); + } catch (IllegalStateException ex) { + LOG.warn("Cannot translate workflow spec to json for debug."); + } catch (FileNotFoundException ex) { + LOG.warn("Cannot create workflow spec output file."); + } + } + + String jobIdToUpdate = null; + if (options.isUpdate()) { + jobIdToUpdate = getJobIdFromName(options.getJobName()); + newJob.setTransformNameMapping(options.getTransformNameMapping()); + newJob.setReplaceJobId(jobIdToUpdate); + } + Job jobResult; + try { + jobResult = dataflowClient + .projects() + .jobs() + .create(options.getProject(), newJob) + .execute(); + } catch (GoogleJsonResponseException e) { + String errorMessages = "Unexpected errors"; + if (e.getDetails() != null) { + if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) { + errorMessages = "The size of the serialized JSON representation of the pipeline " + + "exceeds the allowable limit. " + + "For more information, please check the FAQ link below:\n" + + "https://cloud.google.com/dataflow/faq"; + } else { + errorMessages = e.getDetails().getMessage(); + } + } + throw new RuntimeException("Failed to create a workflow job: " + errorMessages, e); + } catch (IOException e) { + throw new RuntimeException("Failed to create a workflow job", e); + } + + // Obtain all of the extractors from the PTransforms used in the pipeline so the + // DataflowPipelineJob has access to them. + AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline); + Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = + aggregatorExtractor.getAggregatorSteps(); + + DataflowAggregatorTransforms aggregatorTransforms = + new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames()); + + // Use a raw client for post-launch monitoring, as status calls may fail + // regularly and need not be retried automatically. + DataflowPipelineJob dataflowPipelineJob = + new DataflowPipelineJob(options.getProject(), jobResult.getId(), + DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); + + // If the service returned client request id, the SDK needs to compare it + // with the original id generated in the request, if they are not the same + // (i.e., the returned job is not created by this request), throw + // DataflowJobAlreadyExistsException or DataflowJobAlreadyUpdatedExcetpion + // depending on whether this is a reload or not. + if (jobResult.getClientRequestId() != null && !jobResult.getClientRequestId().isEmpty() + && !jobResult.getClientRequestId().equals(requestId)) { + // If updating a job. + if (options.isUpdate()) { + throw new DataflowJobAlreadyUpdatedException(dataflowPipelineJob, + String.format("The job named %s with id: %s has already been updated into job id: %s " + + "and cannot be updated again.", + newJob.getName(), jobIdToUpdate, jobResult.getId())); + } else { + throw new DataflowJobAlreadyExistsException(dataflowPipelineJob, + String.format("There is already an active job named %s with id: %s. If you want " + + "to submit a second job, try again by setting a different name using --jobName.", + newJob.getName(), jobResult.getId())); + } + } + + LOG.info("To access the Dataflow monitoring console, please navigate to {}", + MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId())); + System.out.println("Submitted job: " + jobResult.getId()); + + LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", + MonitoringUtil.getGcloudCancelCommand(options, jobResult.getId())); + + return dataflowPipelineJob; + } + + /** + * Returns the DataflowPipelineTranslator associated with this object. + */ + public DataflowPipelineTranslator getTranslator() { + return translator; + } + + /** + * Sets callbacks to invoke during execution see {@code DataflowRunnerHooks}. + */ + @Experimental + public void setHooks(DataflowRunnerHooks hooks) { + this.hooks = hooks; + } + + ///////////////////////////////////////////////////////////////////////////// + + /** Outputs a warning about PCollection views without deterministic key coders. */ + private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) { + // We need to wait till this point to determine the names of the transforms since only + // at this time do we know the hierarchy of the transforms otherwise we could + // have just recorded the full names during apply time. + if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { + final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); + pipeline.traverseTopologically(new PipelineVisitor() { + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + } + }); + + LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " + + "because the key coder is not deterministic. Falling back to singleton implementation " + + "which may cause memory and/or performance problems. Future major versions of " + + "Dataflow will require deterministic key coders.", + ptransformViewNamesWithNonDeterministicKeyCoders); + } + } + + /** + * Returns true if the passed in {@link PCollection} needs to be materialiazed using + * an indexed format. + */ + boolean doesPCollectionRequireIndexedFormat(PCollection<?> pcol) { + return pcollectionsRequiringIndexedFormat.contains(pcol); + } + + /** + * Marks the passed in {@link PCollection} as requiring to be materialized using + * an indexed format. + */ + private void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) { + pcollectionsRequiringIndexedFormat.add(pcol); + } + + /** A set of {@link View}s with non-deterministic key coders. */ + Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders; + + /** + * Records that the {@link PTransform} requires a deterministic key coder. + */ + private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { + ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); + } + + /** + * A {@link GroupByKey} transform for the {@link DataflowRunner} which sorts + * values using the secondary key {@code K2}. + * + * <p>The {@link PCollection} created created by this {@link PTransform} will have values in + * the empty window. Care must be taken *afterwards* to either re-window + * (using {@link Window#into}) or only use {@link PTransform}s that do not depend on the + * values being within a window. + */ + static class GroupByKeyAndSortValuesOnly<K1, K2, V> + extends PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1, Iterable<KV<K2, V>>>>> { + private GroupByKeyAndSortValuesOnly() { + } + + @Override + public PCollection<KV<K1, Iterable<KV<K2, V>>>> apply(PCollection<KV<K1, KV<K2, V>>> input) { + PCollection<KV<K1, Iterable<KV<K2, V>>>> rval = + PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + IsBounded.BOUNDED); + + @SuppressWarnings({"unchecked", "rawtypes"}) + KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder(); + rval.setCoder( + KvCoder.of(inputCoder.getKeyCoder(), + IterableCoder.of(inputCoder.getValueCoder()))); + return rval; + } + } + + /** + * A {@link PTransform} that groups the values by a hash of the window's byte representation + * and sorts the values using the windows byte representation. + */ + private static class GroupByWindowHashAsKeyAndWindowAsSortKey<T, W extends BoundedWindow> extends + PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> { + + /** + * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for + * grouping by the hash of the window's byte representation and sorting the grouped values + * using the window's byte representation. + */ + @SystemDoFnInternal + private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow> + extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements DoFn.RequiresWindowAccess { + + private final IsmRecordCoder<?> ismCoderForHash; + private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) { + this.ismCoderForHash = ismCoderForHash; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + @SuppressWarnings("unchecked") + W window = (W) c.window(); + c.output( + KV.of(ismCoderForHash.hash(ImmutableList.of(window)), + KV.of(window, + WindowedValue.of( + c.element(), + c.timestamp(), + c.window(), + c.pane())))); + } + } + + private final IsmRecordCoder<?> ismCoderForHash; + private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmRecordCoder<?> ismCoderForHash) { + this.ismCoderForHash = ismCoderForHash; + } + + @Override + public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> apply(PCollection<T> input) { + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + PCollection<KV<Integer, KV<W, WindowedValue<T>>>> rval = + input.apply(ParDo.of( + new UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W>(ismCoderForHash))); + rval.setCoder( + KvCoder.of( + VarIntCoder.of(), + KvCoder.of(windowCoder, + FullWindowedValueCoder.of(input.getCoder(), windowCoder)))); + return rval.apply(new GroupByKeyAndSortValuesOnly<Integer, W, WindowedValue<T>>()); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the + * Dataflow runner in batch mode. + * + * <p>Creates a set of files in the {@link IsmFormat} sharded by the hash of the windows + * byte representation and with records having: + * <ul> + * <li>Key 1: Window</li> + * <li>Value: Windowed value</li> + * </ul> + */ + static class BatchViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + + /** + * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows: + * <ul> + * <li>Key 1: Window + * <li>Value: Windowed value + * </ul> + */ + static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<T>>> { + + private final Coder<W> windowCoder; + IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + Optional<Object> previousWindowStructuralValue = Optional.absent(); + T previousValue = null; + + Iterator<KV<W, WindowedValue<T>>> iterator = c.element().getValue().iterator(); + while (iterator.hasNext()) { + KV<W, WindowedValue<T>> next = iterator.next(); + Object currentWindowStructuralValue = windowCoder.structuralValue(next.getKey()); + + // Verify that the user isn't trying to have more than one element per window as + // a singleton. + checkState(!previousWindowStructuralValue.isPresent() + || !previousWindowStructuralValue.get().equals(currentWindowStructuralValue), + "Multiple values [%s, %s] found for singleton within window [%s].", + previousValue, + next.getValue().getValue(), + next.getKey()); + + c.output( + IsmRecord.of( + ImmutableList.of(next.getKey()), next.getValue())); + + previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); + previousValue = next.getValue().getValue(); + } + } + } + + private final DataflowRunner runner; + private final View.AsSingleton<T> transform; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public BatchViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> transform) { + this.runner = runner; + this.transform = transform; + } + + @Override + public PCollectionView<T> apply(PCollection<T> input) { + @SuppressWarnings("unchecked") + Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + return BatchViewAsSingleton.<T, T, T, BoundedWindow>applyForSingleton( + runner, + input, + new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(windowCoder), + transform.hasDefaultValue(), + transform.defaultValue(), + input.getCoder()); + } + + static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT> + applyForSingleton( + DataflowRunner runner, + PCollection<T> input, + DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<FinalT>>> doFn, + boolean hasDefault, + FinalT defaultValue, + Coder<FinalT> defaultValueCoder) { + + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + PCollectionView<ViewT> view = + (PCollectionView<ViewT>) PCollectionViews.<FinalT, W>singletonView( + input.getPipeline(), + (WindowingStrategy) input.getWindowingStrategy(), + hasDefault, + defaultValue, + defaultValueCoder); + + IsmRecordCoder<WindowedValue<FinalT>> ismCoder = + coderForSingleton(windowCoder, defaultValueCoder); + + PCollection<IsmRecord<WindowedValue<FinalT>>> reifiedPerWindowAndSorted = input + .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder)) + .apply(ParDo.of(doFn)); + reifiedPerWindowAndSorted.setCoder(ismCoder); + + runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); + return reifiedPerWindowAndSorted.apply( + CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view)); + } + + @Override + protected String getKindString() { + return "BatchViewAsSingleton"; + } + + static <T> IsmRecordCoder<WindowedValue<T>> coderForSingleton( + Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) { + return IsmRecordCoder.of( + 1, // We hash using only the window + 0, // There are no metadata records + ImmutableList.<Coder<?>>of(windowCoder), + FullWindowedValueCoder.of(valueCoder, windowCoder)); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the + * Dataflow runner in batch mode. + * + * <p>Creates a set of {@code Ism} files sharded by the hash of the windows byte representation + * and with records having: + * <ul> + * <li>Key 1: Window</li> + * <li>Key 2: Index offset within window</li> + * <li>Value: Windowed value</li> + * </ul> + */ + static class BatchViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + + private final DataflowRunner runner; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public BatchViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Iterable<T>> apply(PCollection<T> input) { + PCollectionView<Iterable<T>> view = PCollectionViews.iterableView( + input.getPipeline(), input.getWindowingStrategy(), input.getCoder()); + return BatchViewAsList.applyForIterableLike(runner, input, view); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the + * Dataflow runner in batch mode. + * + * <p>Creates a set of {@code Ism} files sharded by the hash of the window's byte representation + * and with records having: + * <ul> + * <li>Key 1: Window</li> + * <li>Key 2: Index offset within window</li> + * <li>Value: Windowed value</li> + * </ul> + */ + static class BatchViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the + * global window. Each {@link IsmRecord} has + * <ul> + * <li>Key 1: Global window</li> + * <li>Key 2: Index offset within window</li> + * <li>Value: Windowed value</li> + * </ul> + */ + @SystemDoFnInternal + static class ToIsmRecordForGlobalWindowDoFn<T> + extends DoFn<T, IsmRecord<WindowedValue<T>>> { + + long indexInBundle; + @Override + public void startBundle(Context c) throws Exception { + indexInBundle = 0; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(IsmRecord.of( + ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle), + WindowedValue.of( + c.element(), + c.timestamp(), + GlobalWindow.INSTANCE, + c.pane()))); + indexInBundle += 1; + } + } + + /** + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows + * to locate the window boundaries. The {@link IsmRecord} has: + * <ul> + * <li>Key 1: Window</li> + * <li>Key 2: Index offset within window</li> + * <li>Value: Windowed value</li> + * </ul> + */ + @SystemDoFnInternal + static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + IsmRecord<WindowedValue<T>>> { + + private final Coder<W> windowCoder; + ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + long elementsInWindow = 0; + Optional<Object> previousWindowStructuralValue = Optional.absent(); + for (KV<W, WindowedValue<T>> value : c.element().getValue()) { + Object currentWindowStructuralValue = windowCoder.structuralValue(value.getKey()); + // Compare to see if this is a new window so we can reset the index counter i + if (previousWindowStructuralValue.isPresent() + && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) { + // Reset i since we have a new window. + elementsInWindow = 0; + } + c.output(IsmRecord.of( + ImmutableList.of(value.getKey(), elementsInWindow), + value.getValue())); + previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); + elementsInWindow += 1; + } + } + } + + private final DataflowRunner runner; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public BatchViewAsList(DataflowRunner runner, View.AsList<T> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<List<T>> apply(PCollection<T> input) { + PCollectionView<List<T>> view = PCollectionViews.listView( + input.getPipeline(), input.getWindowingStrategy(), input.getCoder()); + return applyForIterableLike(runner, input, view); + } + + static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike( + DataflowRunner runner, + PCollection<T> input, + PCollectionView<ViewT> view) { + + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + IsmRecordCoder<WindowedValue<T>> ismCoder = coderForListLike(windowCoder, input.getCoder()); + + // If we are working in the global window, we do not need to do a GBK using the window + // as the key since all the elements of the input PCollection are already such. + // We just reify the windowed value while converting them to IsmRecords and generating + // an index based upon where we are within the bundle. Each bundle + // maps to one file exactly. + if (input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) { + PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = + input.apply(ParDo.of(new ToIsmRecordForGlobalWindowDoFn<T>())); + reifiedPerWindowAndSorted.setCoder(ismCoder); + + runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); + return reifiedPerWindowAndSorted.apply( + CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); + } + + PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input + .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder)) + .apply(ParDo.of(new ToIsmRecordForNonGlobalWindowDoFn<T, W>(windowCoder))); + reifiedPerWindowAndSorted.setCoder(ismCoder); + + runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); + return reifiedPerWindowAndSorted.apply( + CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); + } + + @Override + protected String getKindString() { + return "BatchViewAsList"; + } + + static <T> IsmRecordCoder<WindowedValue<T>> coderForListLike( + Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) { + // TODO: swap to use a variable length long coder which has values which compare + // the same as their byte representation compare lexicographically within the key coder + return IsmRecordCoder.of( + 1, // We hash using only the window + 0, // There are no metadata records + ImmutableList.of(windowCoder, BigEndianLongCoder.of()), + FullWindowedValueCoder.of(valueCoder, windowCoder)); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} for the + * Dataflow runner in batch mode. + * + * <p>Creates a set of {@code Ism} files sharded by the hash of the key's byte + * representation. Each record is structured as follows: + * <ul> + * <li>Key 1: User key K</li> + * <li>Key 2: Window</li> + * <li>Key 3: 0L (constant)</li> + * <li>Value: Windowed value</li> + * </ul> + * + * <p>Alongside the data records, there are the following metadata records: + * <ul> + * <li>Key 1: Metadata Key</li> + * <li>Key 2: Window</li> + * <li>Key 3: Index [0, size of map]</li> + * <li>Value: variable length long byte representation of size of map if index is 0, + * otherwise the byte representation of a key</li> + * </ul> + * The {@code [META, Window, 0]} record stores the number of unique keys per window, while + * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. + * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} + * and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in + * {@code [1, size of map]}. + * + * <p>Note that in the case of a non-deterministic key coder, we fallback to using + * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing + * a warning to users to specify a deterministic key coder. + */ + static class BatchViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + /** + * A {@link DoFn} which groups elements by window boundaries. For each group, + * the group of elements is transformed into a {@link TransformedMap}. + * The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>} + * and contains a function {@code WindowedValue<V> -> V}. + * + * <p>Outputs {@link IsmRecord}s having: + * <ul> + * <li>Key 1: Window</li> + * <li>Value: Transformed map containing a transform that removes the encapsulation + * of the window around each value, + * {@code Map<K, WindowedValue<V>> -> Map<K, V>}.</li> + * </ul> + */ + static class ToMapDoFn<K, V, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + IsmRecord<WindowedValue<TransformedMap<K, + WindowedValue<V>, + V>>>> { + + private final Coder<W> windowCoder; + ToMapDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) + throws Exception { + Optional<Object> previousWindowStructuralValue = Optional.absent(); + Optional<W> previousWindow = Optional.absent(); + Map<K, WindowedValue<V>> map = new HashMap<>(); + for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) { + Object currentWindowStructuralValue = windowCoder.structuralValue(kv.getKey()); + if (previousWindowStructuralValue.isPresent() + && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) { + // Construct the transformed map containing all the elements since we + // are at a window boundary. + c.output(IsmRecord.of( + ImmutableList.of(previousWindow.get()), + valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); + map = new HashMap<>(); + } + + // Verify that the user isn't trying to insert the same key multiple times. + checkState(!map.containsKey(kv.getValue().getValue().getKey()), + "Multiple values [%s, %s] found for single key [%s] within window [%s].", + map.get(kv.getValue().getValue().getKey()), + kv.getValue().getValue().getValue(), + kv.getKey()); + map.put(kv.getValue().getValue().getKey(), + kv.getValue().withValue(kv.getValue().getValue().getValue())); + previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); + previousWindow = Optional.of(kv.getKey()); + } + + // The last value for this hash is guaranteed to be at a window boundary + // so we output a transformed map containing all the elements since the last + // window boundary. + c.output(IsmRecord.of( + ImmutableList.of(previousWindow.get()), + valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); + } + } + + private final DataflowRunner runner; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public BatchViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) { + return this.<BoundedWindow>applyInternal(input); + } + + private <W extends BoundedWindow> PCollectionView<Map<K, V>> + applyInternal(PCollection<KV<K, V>> input) { + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + PCollectionView<Map<K, V>> view = PCollectionViews.mapView( + input.getPipeline(), input.getWindowingStrategy(), inputCoder); + return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */); + } catch (NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + + // Since the key coder is not deterministic, we convert the map into a singleton + // and return a singleton view equivalent. + return applyForSingletonFallback(input); + } + } + + @Override + protected String getKindString() { + return "BatchViewAsMap"; + } + + /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ + private <W extends BoundedWindow> PCollectionView<Map<K, V>> + applyForSingletonFallback(PCollection<KV<K, V>> input) { + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder<Function<WindowedValue<V>, V>> transformCoder = + (Coder) SerializableCoder.of(WindowedValueToValue.class); + + Coder<TransformedMap<K, WindowedValue<V>, V>> finalValueCoder = + TransformedMapCoder.of( + transformCoder, + MapCoder.of( + inputCoder.getKeyCoder(), + FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))); + + TransformedMap<K, WindowedValue<V>, V> defaultValue = new TransformedMap<>( + WindowedValueToValue.<V>of(), + ImmutableMap.<K, WindowedValue<V>>of()); + + return BatchViewAsSingleton.<KV<K, V>, + TransformedMap<K, WindowedValue<V>, V>, + Map<K, V>, + W> applyForSingleton( + runner, + input, + new ToMapDoFn<K, V, W>(windowCoder), + true, + defaultValue, + finalValueCoder); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the + * Dataflow runner in batch mode. + * + * <p>Creates a set of {@code Ism} files sharded by the hash of the key's byte + * representation. Each record is structured as follows: + * <ul> + * <li>Key 1: User key K</li> + * <li>Key 2: Window</li> + * <li>Key 3: Index offset for a given key and window.</li> + * <li>Value: Windowed value</li> + * </ul> + * + * <p>Alongside the data records, there are the following metadata records: + * <ul> + * <li>Key 1: Metadata Key</li> + * <li>Key 2: Window</li> + * <li>Key 3: Index [0, size of map]</li> + * <li>Value: variable length long byte representation of size of map if index is 0, + * otherwise the byte representation of a key</li> + * </ul> + * The {@code [META, Window, 0]} record stores the number of unique keys per window, while + * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]} stores a the users key. + * This allows for one to access the size of the map by looking at {@code [META, Window, 0]} + * and iterate over all the keys by accessing {@code [META, Window, i]} for {@code i} in + * {@code [1, size of map]}. + * + * <p>Note that in the case of a non-deterministic key coder, we fallback to using + * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} printing + * a warning to users to specify a deterministic key coder. + */ + static class BatchViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + /** + * A {@link PTransform} that groups elements by the hash of window's byte representation + * if the input {@link PCollection} is not within the global window. Otherwise by the hash + * of the window and key's byte representation. This {@link PTransform} also sorts + * the values by the combination of the window and key's byte representations. + */ + private static class GroupByKeyHashAndSortByKeyAndWindow<K, V, W extends BoundedWindow> + extends PTransform<PCollection<KV<K, V>>, + PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>> { + + @SystemDoFnInternal + private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W> + extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> + implements DoFn.RequiresWindowAccess { + + private final IsmRecordCoder<?> coder; + private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) { + this.coder = coder; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + @SuppressWarnings("unchecked") + W window = (W) c.window(); + + c.output( + KV.of(coder.hash(ImmutableList.of(c.element().getKey())), + KV.of(KV.of(c.element().getKey(), window), + WindowedValue.of( + c.element().getValue(), + c.timestamp(), + (BoundedWindow) window, + c.pane())))); + } + } + + private final IsmRecordCoder<?> coder; + public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder<?> coder) { + this.coder = coder; + } + + @Override + public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>> + apply(PCollection<KV<K, V>> input) { + + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + @SuppressWarnings("unchecked") + KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); + + PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash; + keyedByHash = input.apply( + ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>(coder))); + keyedByHash.setCoder( + KvCoder.of( + VarIntCoder.of(), + KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder), + FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)))); + + return keyedByHash.apply( + new GroupByKeyAndSortValuesOnly<Integer, KV<K, W>, WindowedValue<V>>()); + } + } + + /** + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows + * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have: + * <ul> + * <li>Key 1: Window</li> + * <li>Key 2: User key K</li> + * <li>Key 3: Index offset for a given key and window.</li> + * <li>Value: Windowed value</li> + * </ul> + * + * <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet} + * and the unique key count per window to {@code outputForSize}. + * + * <p>Finally, if this DoFn has been requested to perform unique key checking, it will + * throw an {@link IllegalStateException} if more than one key per window is found. + */ + static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, + IsmRecord<WindowedValue<V>>> { + + private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize; + private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet; + private final Coder<W> windowCoder; + private final Coder<K> keyCoder; + private final IsmRecordCoder<WindowedValue<V>> ismCoder; + private final boolean uniqueKeysExpected; + ToIsmRecordForMapLikeDoFn( + TupleTag<KV<Integer, KV<W, Long>>> outputForSize, + TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet, + Coder<W> windowCoder, + Coder<K> keyCoder, + IsmRecordCoder<WindowedValue<V>> ismCoder, + boolean uniqueKeysExpected) { + this.outputForSize = outputForSize; + this.outputForEntrySet = outputForEntrySet; + this.windowCoder = windowCoder; + this.keyCoder = keyCoder; + this.ismCoder = ismCoder; + this.uniqueKeysExpected = uniqueKeysExpected; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + long currentKeyIndex = 0; + // We use one based indexing while counting + long currentUniqueKeyCounter = 1; + Iterator<KV<KV<K, W>, WindowedValue<V>>> iterator = c.element().getValue().iterator(); + + KV<KV<K, W>, WindowedValue<V>> currentValue = iterator.next(); + Object currentKeyStructuralValue = + keyCoder.structuralValue(currentValue.getKey().getKey()); + Object currentWindowStructuralValue = + windowCoder.structuralValue(currentValue.getKey().getValue()); + + while (iterator.hasNext()) { + KV<KV<K, W>, WindowedValue<V>> nextValue = iterator.next(); + Object nextKeyStructuralValue = + keyCoder.structuralValue(nextValue.getKey().getKey()); + Object nextWindowStructuralValue = + windowCoder.structuralValue(nextValue.getKey().getValue()); + + outputDataRecord(c, currentValue, currentKeyIndex); + + final long nextKeyIndex; + final long nextUniqueKeyCounter; + + // Check to see if its a new window + if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) { + // The next value is a new window, so we output for size the number of unique keys + // seen and the last key of the window. We also reset the next key index the unique + // key counter. + outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter); + outputMetadataRecordForEntrySet(c, currentValue); + + nextKeyIndex = 0; + nextUniqueKeyCounter = 1; + } else if (!currentKeyStructuralValue.equals(nextKeyStructuralValue)){ + // It is a new key within the same window so output the key for the entry set, + // reset the key index and increase the count of unique keys seen within this window. + outputMetadataRecordForEntrySet(c, currentValue); + + nextKeyIndex = 0; + nextUniqueKeyCounter = currentUniqueKeyCounter + 1; + } else if (!uniqueKeysExpected) { + // It is not a new key so we don't have to output the number of elements in this + // window or increase the unique key counter. All we do is increase the key index. + + nextKeyIndex = currentKeyIndex + 1; + nextUniqueKeyCounter = currentUniqueKeyCounter; + } else { + throw new IllegalStateException(String.format( + "Unique keys are expected but found key %s with values %s and %s in window %s.", + currentValue.getKey().getKey(), + currentValue.getValue().getValue(), + nextValue.getValue().getValue(), + currentValue.getKey().getValue())); + } + + currentValue = nextValue; + currentWindowStructuralValue = nextWindowStructuralValue; + currentKeyStructuralValue = nextKeyStructuralValue; + currentKeyIndex = nextKeyIndex; + currentUniqueKeyCounter = nextUniqueKeyCounter; + } + + outputDataRecord(c, currentValue, currentKeyIndex); + outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter); + // The last value for this hash is guaranteed to be at a window boundary + // so we output a record with the number of unique keys seen. + outputMetadataRecordForEntrySet(c, currentValue); + } + + /** This outputs the data record. */ + private void outputDataRecord( + ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long keyIndex) { + IsmRecord<WindowedValue<V>> ismRecord = IsmRecord.of( + ImmutableList.of( + value.getKey().getKey(), + value.getKey().getValue(), + keyIndex), + value.getValue()); + c.output(ismRecord); + } + + /** + * This outputs records which will be used to compute the number of keys for a given window. + */ + private void outputMetadataRecordForSize( + ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long uniqueKeyCount) { + c.sideOutput(outputForSize, + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), + value.getKey().getValue())), + KV.of(value.getKey().getValue(), uniqueKeyCount))); + } + + /** This outputs records which will be used to construct the entry set. */ + private void outputMetadataRecordForEntrySet( + ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) { + c.sideOutput(outputForEntrySet, + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), + value.getKey().getValue())), + KV.of(value.getKey().getValue(), value.getKey().getKey()))); + } + } + + /** + * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of: + * <ul> + * <li>Key 1: META key</li> + * <li>Key 2: window</li> + * <li>Key 3: 0L (constant)</li> + * <li>Value: sum of values for window</li> + * </ul> + * + * <p>This {@link DoFn} is meant to be used to compute the number of unique keys + * per window for map and multimap side inputs. + */ + static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> { + private final Coder<W> windowCoder; + ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + Iterator<KV<W, Long>> iterator = c.element().getValue().iterator(); + KV<W, Long> currentValue = iterator.next(); + Object currentWindowStructuralValue = windowCoder.structuralValue(currentValue.getKey()); + long size = 0; + while (iterator.hasNext()) { + KV<W, Long> nextValue = iterator.next(); + Object nextWindowStructuralValue = windowCoder.structuralValue(nextValue.getKey()); + + size += currentValue.getValue(); + if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) { + c.output(IsmRecord.<WindowedValue<V>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), size))); + size = 0; + } + + currentValue = nextValue; + currentWindowStructuralValue = nextWindowStructuralValue; + } + + size += currentValue.getValue(); + // Output the final value since it is guaranteed to be on a window boundary. + c.output(IsmRecord.<WindowedValue<V>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), size))); + } + } + + /** + * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: + * <ul> + * <li>Key 1: META key</li> + * <li>Key 2: window</li> + * <li>Key 3: index offset (1-based index)</li> + * <li>Value: key</li> + * </ul> + * + * <p>This {@link DoFn} is meant to be used to output index to key records + * per window for map and multimap side inputs. + */ + static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> { + + private final Coder<K> keyCoder; + private final Coder<W> windowCoder; + ToIsmMetadataRecordForKeyDoFn(Coder<K> keyCoder, Coder<W> windowCoder) { + this.keyCoder = keyCoder; + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + Iterator<KV<W, K>> iterator = c.element().getValue().iterator(); + KV<W, K> currentValue = iterator.next(); + Object currentWindowStructuralValue = windowCoder.structuralValue(currentValue.getKey()); + long elementsInWindow = 1; + while (iterator.hasNext()) { + KV<W, K> nextValue = iterator.next(); + Object nextWindowStructuralValue = windowCoder.structuralValue(nextValue.getKey()); + + c.output(IsmRecord.<WindowedValue<V>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), elementsInWindow), + CoderUtils.encodeToByteArray(keyCoder, currentValue.getValue()))); + elementsInWindow += 1; + + if (!currentWindowStructuralValue.equals(nextWindowStructuralValue)) { + elementsInWindow = 1; + } + + currentValue = nextValue; + currentWindowStructuralValue = nextWindowStructuralValue; + } + + // Output the final value since it is guaranteed to be on a window boundary. + c.output(IsmRecord.<WindowedValue<V>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), currentValue.getKey(), elementsInWindow), + CoderUtils.encodeToByteArray(keyCoder, currentValue.getValue()))); + } + } + + /** + * A {@link DoFn} which partitions sets of elements by window boundaries. Within each + * partition, the set of elements is transformed into a {@link TransformedMap}. + * The transformed {@code Map<K, Iterable<V>>} is backed by a + * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function + * {@code Iterable<WindowedValue<V>> -> Iterable<V>}. + * + * <p>Outputs {@link IsmRecord}s having: + * <ul> + * <li>Key 1: Window</li> + * <li>Value: Transformed map containing a transform that removes the encapsulation + * of the window around each value, + * {@code Map<K, Iterable<WindowedValue<V>>> -> Map<K, Iterable<V>>}.</li> + * </ul> + */ + static class ToMultimapDoFn<K, V, W extends BoundedWindow> + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + IsmRecord<WindowedValue<TransformedMap<K, + Iterable<WindowedValue<V>>, + Iterable<V>>>>> { + + private final Coder<W> windowCoder; + ToMultimapDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + + @Override + public void processElement(ProcessContext c) + throws Exception { + Optional<Object> previousWindowStructuralValue = Optional.absent(); + Optional<W> previousWindow = Optional.absent(); + Multimap<K, WindowedValue<V>> multimap = HashMultimap.create(); + for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) { + Object currentWindowStructuralValue = windowCoder.structuralValue(kv.getKey()); + if (previousWindowStructuralValue.isPresent() + && !previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) { + // Construct the transformed map containing all the elements since we + // are at a window boundary. + @SuppressWarnings({"unchecked", "rawtypes"}) + Map<K, Iterable<WindowedValue<V>>> resultMap = (Map) multimap.asMap(); + c.output(IsmRecord.<WindowedValue<TransformedMap<K, + Iterable<WindowedValue<V>>, + Iterable<V>>>>of( + ImmutableList.of(previousWindow.get()), + valueInEmptyWindows( + new TransformedMap<>( + IterableWithWindowedValuesToIterable.<V>of(), resultMap)))); + multimap = HashMultimap.create(); + } + + multimap.put(kv.getValue().getValue().getKey(), + kv.getValue().withValue(kv.getValue().getValue().getValue())); + previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); + previousWindow = Optional.of(kv.getKey()); + } + + // The last value for this hash is guaranteed to be at a window boundary + // so we output a transformed map containing all the elements since the last + // window boundary. + @SuppressWarnings({"unchecked", "rawtypes"}) + Map<K, Iterable<WindowedValue<V>>> resultMap = (Map) multimap.asMap(); + c.output(IsmRecord.<WindowedValue<TransformedMap<K, + Iterable<WindowedValue<V>>, + Iterable<V>>>>of( + ImmutableList.of(previousWindow.get()), + valueInEmptyWindows( + new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(), resultMap)))); + } + } + + private final DataflowRunner runner; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() + public BatchViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + return this.<BoundedWindow>applyInternal(input); + } + + private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> + applyInternal(PCollection<KV<K, V>> input) { + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView( + input.getPipeline(), input.getWindowingStrategy(), inputCoder); + + return applyForMapLike(runner, input, view, false /* unique keys not expected */); + } catch (NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + + // Since the key coder is not deterministic, we convert the map into a singleton + // and return a singleton view equivalent. + return applyForSingletonFallback(input); + } + } + + /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ + private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> + applyForSingletonFallback(PCollection<KV<K, V>> input) { + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder<Function<Iterable<WindowedValue<V>>, Iterable<V>>> transformCoder = + (Coder) SerializableCoder.of(IterableWithWindowedValuesToIterable.class); + + Coder<TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>> finalValueCoder = + TransformedMapCoder.of( + transformCoder, + MapCoder.of( + inputCoder.getKeyCoder(), + IterableCoder.of( + FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder)))); + + TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>> defaultValue = + new TransformedMap<>( + IterableWithWindowedValuesToIterable.<V>of(), + ImmutableMap.<K, Iterable<WindowedValue<V>>>of()); + + return BatchViewAsSingleton.<KV<K, V>, + TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>, + Map<K, Iterable<V>>, + W> applyForSingleton( + runner, + input, + new ToMultimapDoFn<K, V, W>(windowCoder), + true, + defaultValue, + finalValueCoder); + } + + private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike( + DataflowRunner runner, + PCollection<KV<K, V>> input, + PCollectionView<ViewT> view, + boolean uniqueKeysExpected) throws NonDeterministicException { + + @SuppressWarnings("unchecked") + Coder<W> windowCoder = (Coder<W>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + + // If our key coder is deterministic, we can use the key portion of each KV + // part of a composite key containing the window , key and index. + inputCoder.getKeyCoder().verifyDeterministic(); + + IsmRecordCoder<WindowedValue<V>> ismCoder = + coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder()); + + // Create the various output tags representing the main output containing the data stream + // and the side outputs containing the metadata about the size and entry set. + TupleTag<IsmRecord<WindowedValue<V>>> mainOutputTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<W, Long>>> outputForSizeTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<W, K>>> outputForEntrySetTag = new TupleTag<>(); + + // Process all the elements grouped by key hash, and sorted by key and then window + // outputting to all the outputs defined above. + PCollectionTuple outputTuple = input + .apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K, V, W>(ismCoder)) + .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<K, V, W>( + outputForSizeTag, outputForEntrySetTag, + windowCoder, inputCoder.getKeyCoder(), ismCoder, uniqueKeysExpected)) + .withOutputTags(mainOutputTag, + TupleTagList.of( + ImmutableList.<TupleTag<?>>of(outputForSizeTag, + outputForEntrySetTag)))); + + // Set the coder on the main data output. + PCollection<IsmRecord<WindowedValue<V>>> perHashWithReifiedWindows = + outputTuple.get(mainOutputTag); + perHashWithReifiedWindows.setCoder(ismCoder); + + // Set the coder on the metadata output for size and process the entries + // producing a [META, Window, 0L] record per window storing the number of unique keys + // for each window. + PCollection<KV<Integer, KV<W, Long>>> outputForSize = outputTuple.get(outputForSizeTag); + outputForSize.setCoder( + KvCoder.of(VarIntCoder.of(), + KvCoder.of(windowCoder, VarLongCoder.of()))); + PCollection<IsmRecord<WindowedValue<V>>> windowMapSizeMetadata = outputForSize + .apply("GBKaSVForSize", new GroupByKeyAndSortValuesOnly<Integer, W, Long>()) + .apply(ParDo.of(new ToIsmMetadataRecordForSizeDoFn<K, V, W>(windowCoder))); + windowMapSizeMetadata.setCoder(ismCoder); + + // Set the coder on the metadata output destined to build the entry set and process the + // entries producing a [META, Window, Index] record per window key pair storing the key. + PCollection<KV<Integer, KV<W, K>>> outputForEntrySet = + outputTuple.get(outputForEntrySetTag); + outputForEntrySet.setCoder( + KvCoder.of(VarIntCoder.of(), + KvCoder.of(windowCoder, inputCoder.getKeyCoder()))); + PCollection<IsmRecord<WindowedValue<V>>> windowMapKeysMetadata = outputForEntrySet + .apply("GBKaSVForKeys", new GroupByKeyAndSortValuesOnly<Integer, W, K>()) + .apply(ParDo.of( + new ToIsmMetadataRecordForKeyDoFn<K, V, W>(inputCoder.getKeyCoder(), windowCoder))); + windowMapKeysMetadata.setCoder(ismCoder); + + // Set that all these outputs should be materialized using an indexed format. + runner.addPCollectionRequiringIndexedFormat(perHashWithReifiedWindows); + runner.addPCollectionRequiringIndexedFormat(windowMapSizeMetadata); + runner.addPCollectionRequiringIndexedFormat(windowMapKeysMetadata); + + PCollectionList<IsmRecord<WindowedValue<V>>> outputs = + PCollectionList.of(ImmutableList.of( + perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata)); + + return Pipeline.applyTransform(outputs, + Flatten.<IsmRecord<WindowedValue<V>>>pCollections()) + .apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>, + ViewT>of(view)); + } + + @Override + protected String getKindString() { + return "BatchViewAsMultimap"; + } + + static <V> IsmRecordCoder<WindowedValue<V>> coderForMapLike( + Coder<? extends BoundedWindow> windowCoder, Coder<?> keyCoder, Coder<V> valueCoder) { + // TODO: swap to use a variable length long coder which has values which compare + // the same as their byte representation compare lexicographically within the key coder + return IsmRecordCoder.of( + 1, // We use only the key for hashing when producing value records + 2, // Since the key is not present, we add the window to the hash when + // producing metadata records + ImmutableList.of( + MetadataKeyCoder.of(keyCoder), + windowCoder, + BigEndianLongCoder.of()), + FullWindowedValueCoder.of(valueCoder, windowCoder)); + } + } + + /** + * A {@code Map<K, V2>} backed by a {@code Map<K, V1>} and a function that transforms + * {@code V1 -> V2}. + */ + static class TransformedMap<K, V1, V2> + extends ForwardingMap<K, V2> { + private final Function<V1, V2> transform; + private final Map<K, V1> originalMap; + private final Map<K, V2> transformedMap; + + private TransformedMap(Function<V1, V2> transform, Map<K, V1> originalMap) { + this.transform = transform; + this.originalMap = Collections.unmodifiable
<TRUNCATED>
