http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java deleted file mode 100644 index f618bc9..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java +++ /dev/null @@ -1,957 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import static com.google.api.services.datastore.DatastoreV1.PropertyFilter.Operator.EQUAL; -import static com.google.api.services.datastore.DatastoreV1.PropertyOrder.Direction.DESCENDING; -import static com.google.api.services.datastore.DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED; -import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap; -import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter; -import static com.google.api.services.datastore.client.DatastoreHelper.makeOrder; -import static com.google.api.services.datastore.client.DatastoreHelper.makeValue; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Verify.verify; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.datastore.DatastoreV1.CommitRequest; -import com.google.api.services.datastore.DatastoreV1.Entity; -import com.google.api.services.datastore.DatastoreV1.EntityResult; -import com.google.api.services.datastore.DatastoreV1.Key; -import com.google.api.services.datastore.DatastoreV1.Key.PathElement; -import com.google.api.services.datastore.DatastoreV1.PartitionId; -import com.google.api.services.datastore.DatastoreV1.Query; -import com.google.api.services.datastore.DatastoreV1.QueryResultBatch; -import com.google.api.services.datastore.DatastoreV1.RunQueryRequest; -import com.google.api.services.datastore.DatastoreV1.RunQueryResponse; -import com.google.api.services.datastore.client.Datastore; -import com.google.api.services.datastore.client.DatastoreException; -import com.google.api.services.datastore.client.DatastoreFactory; -import com.google.api.services.datastore.client.DatastoreHelper; -import com.google.api.services.datastore.client.DatastoreOptions; -import com.google.api.services.datastore.client.QuerySplitter; -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.EntityCoder; -import com.google.cloud.dataflow.sdk.coders.SerializableCoder; -import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation; -import com.google.cloud.dataflow.sdk.io.Sink.Writer; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; -import com.google.cloud.dataflow.sdk.options.GcpOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; -import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Ints; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -/** - * <p>{@link DatastoreIO} provides an API to Read and Write {@link PCollection PCollections} of - * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> - * {@link Entity} objects. - * - * <p>Google Cloud Datastore is a fully managed NoSQL data storage service. - * An {@code Entity} is an object in Datastore, analogous to a row in traditional - * database table. - * - * <p>This API currently requires an authentication workaround. To use {@link DatastoreIO}, users - * must use the {@code gcloud} command line tool to get credentials for Datastore: - * <pre> - * $ gcloud auth login - * </pre> - * - * <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreIO#source} and - * its methods {@link DatastoreIO.Source#withDataset} and {@link DatastoreIO.Source#withQuery} to - * specify the dataset to query and the query to read from. You can optionally provide a namespace - * to query within using {@link DatastoreIO.Source#withNamespace} or a Datastore host using - * {@link DatastoreIO.Source#withHost}. - * - * <p>For example: - * - * <pre> {@code - * // Read a query from Datastore - * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - * Query query = ...; - * String dataset = "..."; - * - * Pipeline p = Pipeline.create(options); - * PCollection<Entity> entities = p.apply( - * Read.from(DatastoreIO.source() - * .withDataset(datasetId) - * .withQuery(query) - * .withHost(host))); - * } </pre> - * - * <p>or: - * - * <pre> {@code - * // Read a query from Datastore using the default namespace and host - * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - * Query query = ...; - * String dataset = "..."; - * - * Pipeline p = Pipeline.create(options); - * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query)); - * p.run(); - * } </pre> - * - * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across - * many workers. However, when the {@link Query} is configured with a limit using - * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then - * all returned results will be read by a single Dataflow worker in order to ensure correct data. - * - * <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreIO#writeTo}, - * specifying the datastore to write to: - * - * <pre> {@code - * PCollection<Entity> entities = ...; - * entities.apply(DatastoreIO.writeTo(dataset)); - * p.run(); - * } </pre> - * - * <p>To optionally change the host that is used to write to the Datastore, use {@link - * DatastoreIO#sink} to build a {@link DatastoreIO.Sink} and write to it using the {@link Write} - * transform: - * - * <pre> {@code - * PCollection<Entity> entities = ...; - * entities.apply(Write.to(DatastoreIO.sink().withDataset(dataset).withHost(host))); - * } </pre> - * - * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete - * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the - * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than the - * project default may be written to by specifying it in the {@code Entity} {@code Keys}. - * - * <pre>{@code - * Key.Builder keyBuilder = DatastoreHelper.makeKey(...); - * keyBuilder.getPartitionIdBuilder().setNamespace(namespace); - * }</pre> - * - * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read - * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and - * Keys</a> for more information about {@code Entity} keys. - * - * <p><h3>Permissions</h3> - * Permission requirements depend on the {@code PipelineRunner} that is used to execute the - * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for - * more details. - * - * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up - * </a>for security and permission related information specific to Datastore. - * - * @see com.google.cloud.dataflow.sdk.runners.PipelineRunner - */ -@Experimental(Experimental.Kind.SOURCE_SINK) -public class DatastoreIO { - public static final String DEFAULT_HOST = "https://www.googleapis.com"; - - /** - * Datastore has a limit of 500 mutations per batch operation, so we flush - * changes to Datastore every 500 entities. - */ - public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; - - /** - * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}. - * Configure the {@code dataset}, {@code query}, and {@code namespace} using - * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery}, - * and {@link DatastoreIO.Source#withNamespace}. - * - * @deprecated the name and return type do not match. Use {@link #source()}. - */ - @Deprecated - public static Source read() { - return source(); - } - - /** - * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}. - * Configure the {@code dataset}, {@code query}, and {@code namespace} using - * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery}, - * and {@link DatastoreIO.Source#withNamespace}. - * - * <p>The resulting {@link Source} object can be passed to {@link Read} to create a - * {@code PTransform} that will read from Datastore. - */ - public static Source source() { - return new Source(DEFAULT_HOST, null, null, null); - } - - /** - * Returns a {@code PTransform} that reads Datastore entities from the query - * against the given dataset. - */ - public static Read.Bounded<Entity> readFrom(String datasetId, Query query) { - return Read.from(new Source(DEFAULT_HOST, datasetId, query, null)); - } - - /** - * Returns a {@code PTransform} that reads Datastore entities from the query - * against the given dataset and host. - * - * @deprecated prefer {@link #source()} with {@link Source#withHost}, {@link Source#withDataset}, - * {@link Source#withQuery}s. - */ - @Deprecated - public static Read.Bounded<Entity> readFrom(String host, String datasetId, Query query) { - return Read.from(new Source(host, datasetId, query, null)); - } - - /** - * A {@link Source} that reads the result rows of a Datastore query as {@code Entity} objects. - */ - public static class Source extends BoundedSource<Entity> { - public String getHost() { - return host; - } - - public String getDataset() { - return datasetId; - } - - public Query getQuery() { - return query; - } - - @Nullable - public String getNamespace() { - return namespace; - } - - public Source withDataset(String datasetId) { - checkNotNull(datasetId, "datasetId"); - return new Source(host, datasetId, query, namespace); - } - - /** - * Returns a new {@link Source} that reads the results of the specified query. - * - * <p>Does not modify this object. - * - * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel - * across many workers. However, when the {@link Query} is configured with a limit using - * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then all - * returned results will be read by a single Dataflow worker in order to ensure correct data. - */ - public Source withQuery(Query query) { - checkNotNull(query, "query"); - checkArgument(!query.hasLimit() || query.getLimit() > 0, - "Invalid query limit %s: must be positive", query.getLimit()); - return new Source(host, datasetId, query, namespace); - } - - public Source withHost(String host) { - checkNotNull(host, "host"); - return new Source(host, datasetId, query, namespace); - } - - public Source withNamespace(@Nullable String namespace) { - return new Source(host, datasetId, query, namespace); - } - - @Override - public Coder<Entity> getDefaultOutputCoder() { - return EntityCoder.of(); - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) { - // TODO: Perhaps this can be implemented by inspecting the query. - return false; - } - - @Override - public List<Source> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) - throws Exception { - // Users may request a limit on the number of results. We can currently support this by - // simply disabling parallel reads and using only a single split. - if (query.hasLimit()) { - return ImmutableList.of(this); - } - - long numSplits; - try { - numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes); - } catch (Exception e) { - // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible. - - // 1. Try Dataflow's numWorkers, which will be 0 for other workers. - DataflowPipelineWorkerPoolOptions poolOptions = - options.as(DataflowPipelineWorkerPoolOptions.class); - if (poolOptions.getNumWorkers() > 0) { - LOG.warn("Estimated size of unavailable, using the number of workers {}", - poolOptions.getNumWorkers(), e); - numSplits = poolOptions.getNumWorkers(); - } else { - // 2. Default to 12 in the unknown case. - numSplits = 12; - } - } - - // If the desiredBundleSize or number of workers results in 1 split, simply return - // a source that reads from the original query. - if (numSplits <= 1) { - return ImmutableList.of(this); - } - - List<Query> datastoreSplits; - try { - datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options); - } catch (IllegalArgumentException | DatastoreException e) { - LOG.warn("Unable to parallelize the given query: {}", query, e); - return ImmutableList.of(this); - } - - ImmutableList.Builder<Source> splits = ImmutableList.builder(); - for (Query splitQuery : datastoreSplits) { - splits.add(new Source(host, datasetId, splitQuery, namespace)); - } - return splits.build(); - } - - @Override - public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException { - return new DatastoreReader(this, getDatastore(pipelineOptions)); - } - - @Override - public void validate() { - Preconditions.checkNotNull(host, "host"); - Preconditions.checkNotNull(query, "query"); - Preconditions.checkNotNull(datasetId, "datasetId"); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - // Datastore provides no way to get a good estimate of how large the result of a query - // will be. As a rough approximation, we attempt to fetch the statistics of the whole - // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind - // is specified in the query. - // - // See https://cloud.google.com/datastore/docs/concepts/stats - if (mockEstimateSizeBytes != null) { - return mockEstimateSizeBytes; - } - - Datastore datastore = getDatastore(options); - if (query.getKindCount() != 1) { - throw new UnsupportedOperationException( - "Can only estimate size for queries specifying exactly 1 kind."); - } - String ourKind = query.getKind(0).getName(); - long latestTimestamp = queryLatestStatisticsTimestamp(datastore); - Query.Builder query = Query.newBuilder(); - if (namespace == null) { - query.addKindBuilder().setName("__Stat_Kind__"); - } else { - query.addKindBuilder().setName("__Ns_Stat_Kind__"); - } - query.setFilter(makeFilter( - makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(), - makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build())); - RunQueryRequest request = makeRequest(query.build()); - - long now = System.currentTimeMillis(); - RunQueryResponse response = datastore.runQuery(request); - LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now); - - QueryResultBatch batch = response.getBatch(); - if (batch.getEntityResultCount() == 0) { - throw new NoSuchElementException( - "Datastore statistics for kind " + ourKind + " unavailable"); - } - Entity entity = batch.getEntityResult(0).getEntity(); - return getPropertyMap(entity).get("entity_bytes").getIntegerValue(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("host", host) - .add("dataset", datasetId) - .add("query", query) - .add("namespace", namespace) - .toString(); - } - - /////////////////////////////////////////////////////////////////////////////////////////// - - private static final Logger LOG = LoggerFactory.getLogger(Source.class); - private final String host; - /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */ - @Nullable - private final String datasetId; - /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */ - @Nullable - private final Query query; - @Nullable - private final String namespace; - - /** For testing only. TODO: This could be much cleaner with dependency injection. */ - @Nullable - private QuerySplitter mockSplitter; - @Nullable - private Long mockEstimateSizeBytes; - - /** - * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be - * {@code null} as a matter of build order, but if they are {@code null} at instantiation time, - * an error will be thrown. - */ - private Source( - String host, @Nullable String datasetId, @Nullable Query query, - @Nullable String namespace) { - this.host = checkNotNull(host, "host"); - this.datasetId = datasetId; - this.query = query; - this.namespace = namespace; - } - - /** - * A helper function to get the split queries, taking into account the optional - * {@code namespace} and whether there is a mock splitter. - */ - private List<Query> getSplitQueries(int numSplits, PipelineOptions options) - throws DatastoreException { - // If namespace is set, include it in the split request so splits are calculated accordingly. - PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); - if (namespace != null) { - partitionBuilder.setNamespace(namespace); - } - - if (mockSplitter != null) { - // For testing. - return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null); - } - - return DatastoreHelper.getQuerySplitter().getSplits( - query, partitionBuilder.build(), numSplits, getDatastore(options)); - } - - /** - * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this - * {@code Source}. For example, sets the {@code namespace} for the request. - */ - private RunQueryRequest makeRequest(Query query) { - RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); - if (namespace != null) { - requestBuilder.getPartitionIdBuilder().setNamespace(namespace); - } - return requestBuilder.build(); - } - - /** - * Datastore system tables with statistics are periodically updated. This method fetches - * the latest timestamp of statistics update using the {@code __Stat_Total__} table. - */ - private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException { - Query.Builder query = Query.newBuilder(); - query.addKindBuilder().setName("__Stat_Total__"); - query.addOrder(makeOrder("timestamp", DESCENDING)); - query.setLimit(1); - RunQueryRequest request = makeRequest(query.build()); - - long now = System.currentTimeMillis(); - RunQueryResponse response = datastore.runQuery(request); - LOG.info("Query for latest stats timestamp of dataset {} took {}ms", datasetId, - System.currentTimeMillis() - now); - QueryResultBatch batch = response.getBatch(); - if (batch.getEntityResultCount() == 0) { - throw new NoSuchElementException( - "Datastore total statistics for dataset " + datasetId + " unavailable"); - } - Entity entity = batch.getEntityResult(0).getEntity(); - return getPropertyMap(entity).get("timestamp").getTimestampMicrosecondsValue(); - } - - private Datastore getDatastore(PipelineOptions pipelineOptions) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder().host(host).dataset(datasetId).initializer( - new RetryHttpRequestInitializer()); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - return DatastoreFactory.get().create(builder.build()); - } - - /** For testing only. */ - Source withMockSplitter(QuerySplitter splitter) { - Source res = new Source(host, datasetId, query, namespace); - res.mockSplitter = splitter; - res.mockEstimateSizeBytes = mockEstimateSizeBytes; - return res; - } - - /** For testing only. */ - Source withMockEstimateSizeBytes(Long estimateSizeBytes) { - Source res = new Source(host, datasetId, query, namespace); - res.mockSplitter = mockSplitter; - res.mockEstimateSizeBytes = estimateSizeBytes; - return res; - } - } - - ///////////////////// Write Class ///////////////////////////////// - - /** - * Returns a new {@link DatastoreIO.Sink} builder using the default host. - * You need to further configure it using {@link DatastoreIO.Sink#withDataset}, and optionally - * {@link DatastoreIO.Sink#withHost} before using it in a {@link Write} transform. - * - * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withDataset(dataset)));} - */ - public static Sink sink() { - return new Sink(DEFAULT_HOST, null); - } - - /** - * Returns a new {@link Write} transform that will write to a {@link Sink}. - * - * <p>For example: {@code p.apply(DatastoreIO.writeTo(dataset));} - */ - public static Write.Bound<Entity> writeTo(String datasetId) { - return Write.to(sink().withDataset(datasetId)); - } - - /** - * A {@link Sink} that writes a {@link PCollection} containing - * {@link Entity Entities} to a Datastore kind. - * - */ - public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<Entity> { - final String host; - final String datasetId; - - /** - * Returns a {@link Sink} that is like this one, but will write to the specified dataset. - */ - public Sink withDataset(String datasetId) { - checkNotNull(datasetId, "datasetId"); - return new Sink(host, datasetId); - } - - /** - * Returns a {@link Sink} that is like this one, but will use the given host. If not specified, - * the {@link DatastoreIO#DEFAULT_HOST default host} will be used. - */ - public Sink withHost(String host) { - checkNotNull(host, "host"); - return new Sink(host, datasetId); - } - - /** - * Constructs a Sink with given host and dataset. - */ - protected Sink(String host, String datasetId) { - this.host = checkNotNull(host, "host"); - this.datasetId = datasetId; - } - - /** - * Ensures the host and dataset are set. - */ - @Override - public void validate(PipelineOptions options) { - Preconditions.checkNotNull( - host, "Host is a required parameter. Please use withHost to set the host."); - Preconditions.checkNotNull( - datasetId, - "Dataset ID is a required parameter. Please use withDataset to to set the datasetId."); - } - - @Override - public DatastoreWriteOperation createWriteOperation(PipelineOptions options) { - return new DatastoreWriteOperation(this); - } - } - - /** - * A {@link WriteOperation} that will manage a parallel write to a Datastore sink. - */ - private static class DatastoreWriteOperation - extends WriteOperation<Entity, DatastoreWriteResult> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class); - - private final DatastoreIO.Sink sink; - - public DatastoreWriteOperation(DatastoreIO.Sink sink) { - this.sink = sink; - } - - @Override - public Coder<DatastoreWriteResult> getWriterResultCoder() { - return SerializableCoder.of(DatastoreWriteResult.class); - } - - @Override - public void initialize(PipelineOptions options) throws Exception {} - - /** - * Finalizes the write. Logs the number of entities written to the Datastore. - */ - @Override - public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options) - throws Exception { - long totalEntities = 0; - for (DatastoreWriteResult result : writerResults) { - totalEntities += result.entitiesWritten; - } - LOG.info("Wrote {} elements.", totalEntities); - } - - @Override - public DatastoreWriter createWriter(PipelineOptions options) throws Exception { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .host(sink.host) - .dataset(sink.datasetId) - .initializer(new RetryHttpRequestInitializer()); - Credential credential = options.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - Datastore datastore = DatastoreFactory.get().create(builder.build()); - - return new DatastoreWriter(this, datastore); - } - - @Override - public DatastoreIO.Sink getSink() { - return sink; - } - } - - /** - * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches, - * where the maximum batch size is {@link DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT}. Entities - * are committed as upsert mutations (either update if the key already exists, or insert if it is - * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle - * will fail. - * - * <p>See <a - * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity"> - * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert - * mutations. - * - * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT} - * times). - * - * <p>Visible for testing purposes. - */ - static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class); - private final DatastoreWriteOperation writeOp; - private final Datastore datastore; - private long totalWritten = 0; - - // Visible for testing. - final List<Entity> entities = new ArrayList<>(); - - /** - * Since a bundle is written in batches, we should retry the commit of a batch in order to - * prevent transient errors from causing the bundle to fail. - */ - private static final int MAX_RETRIES = 5; - - /** - * Initial backoff time for exponential backoff for retry attempts. - */ - private static final int INITIAL_BACKOFF_MILLIS = 5000; - - /** - * Returns true if a Datastore key is complete. A key is complete if its last element - * has either an id or a name. - */ - static boolean isValidKey(Key key) { - List<PathElement> elementList = key.getPathElementList(); - if (elementList.isEmpty()) { - return false; - } - PathElement lastElement = elementList.get(elementList.size() - 1); - return (lastElement.hasId() || lastElement.hasName()); - } - - // Visible for testing - DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) { - this.writeOp = writeOp; - this.datastore = datastore; - } - - @Override - public void open(String uId) throws Exception {} - - /** - * Writes an entity to the Datastore. Writes are batched, up to {@link - * DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an - * {@link IllegalArgumentException} will be thrown. - */ - @Override - public void write(Entity value) throws Exception { - // Verify that the entity to write has a complete key. - if (!isValidKey(value.getKey())) { - throw new IllegalArgumentException( - "Entities to be written to the Datastore must have complete keys"); - } - - entities.add(value); - - if (entities.size() >= DatastoreIO.DATASTORE_BATCH_UPDATE_LIMIT) { - flushBatch(); - } - } - - /** - * Flushes any pending batch writes and returns a DatastoreWriteResult. - */ - @Override - public DatastoreWriteResult close() throws Exception { - if (entities.size() > 0) { - flushBatch(); - } - return new DatastoreWriteResult(totalWritten); - } - - @Override - public DatastoreWriteOperation getWriteOperation() { - return writeOp; - } - - /** - * Writes a batch of entities to the Datastore. - * - * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES} - * times). All entities in the batch will be committed again, even if the commit was partially - * successful. If the retry limit is exceeded, the last exception from the Datastore will be - * thrown. - * - * @throws DatastoreException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.debug("Writing batch of {} entities", entities.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - - while (true) { - // Batch upsert entities. - try { - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.getMutationBuilder().addAllUpsert(entities); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - datastore.commit(commitRequest.build()); - - // Break if the commit threw no exception. - break; - - } catch (DatastoreException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), - exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - totalWritten += entities.size(); - LOG.debug("Successfully wrote {} entities", entities.size()); - entities.clear(); - } - } - - private static class DatastoreWriteResult implements Serializable { - final long entitiesWritten; - - public DatastoreWriteResult(long recordsWritten) { - this.entitiesWritten = recordsWritten; - } - } - - /** - * A {@link Source.Reader} over the records from a query of the datastore. - * - * <p>Timestamped records are currently not supported. - * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}. - */ - public static class DatastoreReader extends BoundedSource.BoundedReader<Entity> { - private final Source source; - - /** - * Datastore to read from. - */ - private final Datastore datastore; - - /** - * True if more results may be available. - */ - private boolean moreResults; - - /** - * Iterator over records. - */ - private java.util.Iterator<EntityResult> entities; - - /** - * Current batch of query results. - */ - private QueryResultBatch currentBatch; - - /** - * Maximum number of results to request per query. - * - * <p>Must be set, or it may result in an I/O error when querying - * Cloud Datastore. - */ - private static final int QUERY_BATCH_LIMIT = 500; - - /** - * Remaining user-requested limit on the number of sources to return. If the user did not set a - * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never - * be decremented. - */ - private int userLimit; - - private Entity currentEntity; - - /** - * Returns a DatastoreReader with Source and Datastore object set. - * - * @param datastore a datastore connection to use. - */ - public DatastoreReader(Source source, Datastore datastore) { - this.source = source; - this.datastore = datastore; - // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE. - userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE; - } - - @Override - public Entity getCurrent() { - return currentEntity; - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (entities == null || (!entities.hasNext() && moreResults)) { - try { - entities = getIteratorAndMoveCursor(); - } catch (DatastoreException e) { - throw new IOException(e); - } - } - - if (entities == null || !entities.hasNext()) { - currentEntity = null; - return false; - } - - currentEntity = entities.next().getEntity(); - return true; - } - - @Override - public void close() throws IOException { - // Nothing - } - - @Override - public DatastoreIO.Source getCurrentSource() { - return source; - } - - @Override - public DatastoreIO.Source splitAtFraction(double fraction) { - // Not supported. - return null; - } - - @Override - public Double getFractionConsumed() { - // Not supported. - return null; - } - - /** - * Returns an iterator over the next batch of records for the query - * and updates the cursor to get the next batch as needed. - * Query has specified limit and offset from InputSplit. - */ - private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException { - Query.Builder query = source.query.toBuilder().clone(); - query.setLimit(Math.min(userLimit, QUERY_BATCH_LIMIT)); - if (currentBatch != null && currentBatch.hasEndCursor()) { - query.setStartCursor(currentBatch.getEndCursor()); - } - - RunQueryRequest request = source.makeRequest(query.build()); - RunQueryResponse response = datastore.runQuery(request); - - currentBatch = response.getBatch(); - - // MORE_RESULTS_AFTER_LIMIT is not implemented yet: - // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so - // use result count to determine if more results might exist. - int numFetch = currentBatch.getEntityResultCount(); - if (source.query.hasLimit()) { - verify(userLimit >= numFetch, - "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit", - userLimit, numFetch, query.getLimit()); - userLimit -= numFetch; - } - moreResults = - // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied. - (userLimit > 0) - // All indications from the API are that there are/may be more results. - && ((numFetch == QUERY_BATCH_LIMIT) || (currentBatch.getMoreResults() == NOT_FINISHED)); - - // May receive a batch of 0 results if the number of records is a multiple - // of the request limit. - if (numFetch == 0) { - return null; - } - - return currentBatch.getEntityResultList().iterator(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java deleted file mode 100644 index dda500c..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ /dev/null @@ -1,864 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.io; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.batch.json.JsonBatchCallback; -import com.google.api.client.googleapis.json.GoogleJsonError; -import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.StorageRequest; -import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.SerializableCoder; -import com.google.cloud.dataflow.sdk.options.GcsOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory; -import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory; -import com.google.cloud.dataflow.sdk.util.IOChannelFactory; -import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.MimeTypes; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.common.base.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.channels.WritableByteChannel; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -import javax.annotation.concurrent.NotThreadSafe; - -/** - * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based - * output and defines the format of output files (how values are written, headers/footers, MIME - * type, etc.). - * - * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink - * and to create a {@link Sink.WriteOperation} that manages the process of writing to the sink. - * - * <p>The process of writing to file-based sink is as follows: - * <ol> - * <li>An optional subclass-defined initialization, - * <li>a parallel write of bundles to temporary files, and finally, - * <li>these temporary files are renamed with final output filenames. - * </ol> - * - * <p>Supported file systems are those registered with {@link IOChannelUtils}. - * - * @param <T> the type of values written to the sink. - */ -public abstract class FileBasedSink<T> extends Sink<T> { - /** - * Base filename for final output files. - */ - protected final String baseOutputFilename; - - /** - * The extension to be used for the final output files. - */ - protected final String extension; - - /** - * Naming template for output files. See {@link ShardNameTemplate} for a description of - * possible naming templates. Default is {@link ShardNameTemplate#INDEX_OF_MAX}. - */ - protected final String fileNamingTemplate; - - /** - * Construct a FileBasedSink with the given base output filename and extension. - */ - public FileBasedSink(String baseOutputFilename, String extension) { - this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX); - } - - /** - * Construct a FileBasedSink with the given base output filename, extension, and file naming - * template. - * - * <p>See {@link ShardNameTemplate} for a description of file naming templates. - */ - public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) { - this.baseOutputFilename = baseOutputFilename; - this.extension = extension; - this.fileNamingTemplate = fileNamingTemplate; - } - - /** - * Returns the base output filename for this file based sink. - */ - public String getBaseOutputFilename() { - return baseOutputFilename; - } - - /** - * Perform pipeline-construction-time validation. The default implementation is a no-op. - * Subclasses should override to ensure the sink is valid and can be written to. It is recommended - * to use {@link Preconditions} in the implementation of this method. - */ - @Override - public void validate(PipelineOptions options) {} - - /** - * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write - * to the sink. - */ - @Override - public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options); - - /** - * Abstract {@link Sink.WriteOperation} that manages the process of writing to a - * {@link FileBasedSink}. - * - * <p>The primary responsibilities of the FileBasedWriteOperation is the management of output - * files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary file - * locations. After the bundles have been written, - * <ol> - * <li>{@link FileBasedSink.FileBasedWriteOperation#finalize} is given a list of the temporary - * files containing the output bundles. - * <li>During finalize, these temporary files are copied to final output locations and named - * according to a file naming template. - * <li>Finally, any temporary files that were created during the write are removed. - * </ol> - * - * <p>Subclass implementations of FileBasedWriteOperation must implement - * {@link FileBasedSink.FileBasedWriteOperation#createWriter} to return a concrete - * FileBasedSinkWriter. - * - * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary - * files using the baseTemporaryFilename that can be provided via the constructor of - * FileBasedWriteOperation. These temporary files will be named - * {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle. - * For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a - * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723". - * - * <p>Final output files are written to baseOutputFilename with the format - * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles - * written and extension is the file extension. Both baseOutputFilename and extension are required - * constructor arguments. - * - * <p>Subclass implementations can change the file naming template by supplying a value for - * {@link FileBasedSink#fileNamingTemplate}. - * - * <h2>Temporary Bundle File Handling:</h2> - * <p>{@link FileBasedSink.FileBasedWriteOperation#temporaryFileRetention} controls the behavior - * for managing temporary files. By default, temporary files will be removed. Subclasses can - * provide a different value to the constructor. - * - * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary - * files will occur. - * - * <p>If there are no elements in the PCollection being written, no output will be generated. - * - * @param <T> the type of values written to the sink. - */ - public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class); - - /** - * Options for handling of temporary output files. - */ - public enum TemporaryFileRetention { - KEEP, - REMOVE; - } - - /** - * The Sink that this WriteOperation will write to. - */ - protected final FileBasedSink<T> sink; - - /** - * Option to keep or remove temporary output files. - */ - protected final TemporaryFileRetention temporaryFileRetention; - - /** - * Base filename used for temporary output files. Default is the baseOutputFilename. - */ - protected final String baseTemporaryFilename; - - /** - * Name separator for temporary files. Temporary files will be named - * {@code {baseTemporaryFilename}-temp-{bundleId}}. - */ - protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-"; - - /** - * Build a temporary filename using the temporary filename separator with the given prefix and - * suffix. - */ - protected static final String buildTemporaryFilename(String prefix, String suffix) { - return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + suffix; - } - - /** - * Construct a FileBasedWriteOperation using the same base filename for both temporary and - * output files. - * - * @param sink the FileBasedSink that will be used to configure this write operation. - */ - public FileBasedWriteOperation(FileBasedSink<T> sink) { - this(sink, sink.baseOutputFilename); - } - - /** - * Construct a FileBasedWriteOperation. - * - * @param sink the FileBasedSink that will be used to configure this write operation. - * @param baseTemporaryFilename the base filename to be used for temporary output files. - */ - public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) { - this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE); - } - - /** - * Create a new FileBasedWriteOperation. - * - * @param sink the FileBasedSink that will be used to configure this write operation. - * @param baseTemporaryFilename the base filename to be used for temporary output files. - * @param temporaryFileRetention defines how temporary files are handled. - */ - public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename, - TemporaryFileRetention temporaryFileRetention) { - this.sink = sink; - this.baseTemporaryFilename = baseTemporaryFilename; - this.temporaryFileRetention = temporaryFileRetention; - } - - /** - * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This - * method must satisfy the restrictions placed on implementations of - * {@link Sink.WriteOperation#createWriter}. Namely, it must not mutate the state of the object. - */ - @Override - public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception; - - /** - * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass - * implementations to perform initialization of the sink at pipeline runtime. This method must - * be idempotent and is subject to the same implementation restrictions as - * {@link Sink.WriteOperation#initialize}. - */ - @Override - public void initialize(PipelineOptions options) throws Exception {} - - /** - * Finalizes writing by copying temporary output files to their final location and optionally - * removing temporary files. - * - * <p>Finalization may be overridden by subclass implementations to perform customized - * finalization (e.g., initiating some operation on output bundles, merging them, etc.). - * {@code writerResults} contains the filenames of written bundles. - * - * <p>If subclasses override this method, they must guarantee that its implementation is - * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It - * is a best practice to attempt to try to make this method atomic. - * - * @param writerResults the results of writes (FileResult). - */ - @Override - public void finalize(Iterable<FileResult> writerResults, PipelineOptions options) - throws Exception { - // Collect names of temporary files and rename them. - List<String> files = new ArrayList<>(); - for (FileResult result : writerResults) { - LOG.debug("Temporary bundle output file {} will be copied.", result.getFilename()); - files.add(result.getFilename()); - } - copyToOutputFiles(files, options); - - // Optionally remove temporary files. - if (temporaryFileRetention == TemporaryFileRetention.REMOVE) { - removeTemporaryFiles(options); - } - } - - /** - * Copy temporary files to final output filenames using the file naming template. - * - * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}. - * - * <p>Files will be named according to the file naming template. The order of the output files - * will be the same as the sorted order of the input filenames. In other words, if the input - * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and - * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to - * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc. - * - * @param filenames the filenames of temporary files. - * @return a list containing the names of final output files. - */ - protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options) - throws IOException { - int numFiles = filenames.size(); - List<String> srcFilenames = new ArrayList<>(); - List<String> destFilenames = generateDestinationFilenames(numFiles); - - // Sort files for copying. - srcFilenames.addAll(filenames); - Collections.sort(srcFilenames); - - if (numFiles > 0) { - LOG.debug("Copying {} files.", numFiles); - FileOperations fileOperations = - FileOperationsFactory.getFileOperations(destFilenames.get(0), options); - fileOperations.copy(srcFilenames, destFilenames); - } else { - LOG.info("No output files to write."); - } - - return destFilenames; - } - - /** - * Generate output bundle filenames. - */ - protected final List<String> generateDestinationFilenames(int numFiles) { - List<String> destFilenames = new ArrayList<>(); - String extension = getSink().extension; - String baseOutputFilename = getSink().baseOutputFilename; - String fileNamingTemplate = getSink().fileNamingTemplate; - - String suffix = getFileExtension(extension); - for (int i = 0; i < numFiles; i++) { - destFilenames.add(IOChannelUtils.constructName( - baseOutputFilename, fileNamingTemplate, suffix, i, numFiles)); - } - return destFilenames; - } - - /** - * Returns the file extension to be used. If the user did not request a file - * extension then this method returns the empty string. Otherwise this method - * adds a {@code "."} to the beginning of the users extension if one is not present. - */ - private String getFileExtension(String usersExtension) { - if (usersExtension == null || usersExtension.isEmpty()) { - return ""; - } - if (usersExtension.startsWith(".")) { - return usersExtension; - } - return "." + usersExtension; - } - - /** - * Removes temporary output files. Uses the temporary filename to find files to remove. - * - * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}. - * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize - * temporary files, this method will remove them. - */ - protected final void removeTemporaryFiles(PipelineOptions options) throws IOException { - String pattern = buildTemporaryFilename(baseTemporaryFilename, "*"); - LOG.debug("Finding temporary bundle output files matching {}.", pattern); - FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options); - IOChannelFactory factory = IOChannelUtils.getFactory(pattern); - Collection<String> matches = factory.match(pattern); - LOG.debug("{} temporary files matched {}", matches.size(), pattern); - LOG.debug("Removing {} files.", matches.size()); - fileOperations.remove(matches); - } - - /** - * Provides a coder for {@link FileBasedSink.FileResult}. - */ - @Override - public Coder<FileResult> getWriterResultCoder() { - return SerializableCoder.of(FileResult.class); - } - - /** - * Returns the FileBasedSink for this write operation. - */ - @Override - public FileBasedSink<T> getSink() { - return sink; - } - } - - /** - * Abstract {@link Sink.Writer} that writes a bundle to a {@link FileBasedSink}. Subclass - * implementations provide a method that can write a single value to a {@link WritableByteChannel} - * ({@link Sink.Writer#write}). - * - * <p>Subclass implementations may also override methods that write headers and footers before and - * after the values in a bundle, respectively, as well as provide a MIME type for the output - * channel. - * - * <p>Multiple FileBasedWriter instances may be created on the same worker, and therefore any - * access to static members or methods should be thread safe. - * - * @param <T> the type of values to write. - */ - public abstract static class FileBasedWriter<T> extends Writer<T, FileResult> { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class); - - final FileBasedWriteOperation<T> writeOperation; - - /** - * Unique id for this output bundle. - */ - private String id; - - /** - * The filename of the output bundle. Equal to the - * {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to - * the baseName. - */ - private String filename; - - /** - * The channel to write to. - */ - private WritableByteChannel channel; - - /** - * The MIME type used in the creation of the output channel (if the file system supports it). - * - * <p>GCS, for example, supports writing files with Content-Type metadata. - * - * <p>May be overridden. Default is {@link MimeTypes#TEXT}. See {@link MimeTypes} for other - * options. - */ - protected String mimeType = MimeTypes.TEXT; - - /** - * Construct a new FileBasedWriter with a base filename. - */ - public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) { - Preconditions.checkNotNull(writeOperation); - this.writeOperation = writeOperation; - } - - /** - * Called with the channel that a subclass will write its header, footer, and values to. - * Subclasses should either keep a reference to the channel provided or create and keep a - * reference to an appropriate object that they will use to write to it. - * - * <p>Called before any subsequent calls to writeHeader, writeFooter, and write. - */ - protected abstract void prepareWrite(WritableByteChannel channel) throws Exception; - - /** - * Writes header at the beginning of output files. Nothing by default; subclasses may override. - */ - protected void writeHeader() throws Exception {} - - /** - * Writes footer at the end of output files. Nothing by default; subclasses may override. - */ - protected void writeFooter() throws Exception {} - - /** - * Opens the channel. - */ - @Override - public final void open(String uId) throws Exception { - this.id = uId; - filename = FileBasedWriteOperation.buildTemporaryFilename( - getWriteOperation().baseTemporaryFilename, uId); - LOG.debug("Opening {}.", filename); - channel = IOChannelUtils.create(filename, mimeType); - try { - prepareWrite(channel); - LOG.debug("Writing header to {}.", filename); - writeHeader(); - } catch (Exception e) { - // The caller shouldn't have to close() this Writer if it fails to open(), so close the - // channel if prepareWrite() or writeHeader() fails. - try { - LOG.error("Writing header to {} failed, closing channel.", filename); - channel.close(); - } catch (IOException closeException) { - // Log exception and mask it. - LOG.error("Closing channel for {} failed: {}", filename, closeException.getMessage()); - } - // Throw the exception that caused the write to fail. - throw e; - } - LOG.debug("Starting write of bundle {} to {}.", this.id, filename); - } - - /** - * Closes the channel and return the bundle result. - */ - @Override - public final FileResult close() throws Exception { - try (WritableByteChannel theChannel = channel) { - LOG.debug("Writing footer to {}.", filename); - writeFooter(); - } - FileResult result = new FileResult(filename); - LOG.debug("Result for bundle {}: {}", this.id, filename); - return result; - } - - /** - * Return the FileBasedWriteOperation that this Writer belongs to. - */ - @Override - public FileBasedWriteOperation<T> getWriteOperation() { - return writeOperation; - } - } - - /** - * Result of a single bundle write. Contains the filename of the bundle. - */ - public static final class FileResult implements Serializable { - private final String filename; - - public FileResult(String filename) { - this.filename = filename; - } - - public String getFilename() { - return filename; - } - } - - // File system operations - // Warning: These class are purposefully private and will be replaced by more robust file I/O - // utilities. Not for use outside FileBasedSink. - - /** - * Factory for FileOperations. - */ - private static class FileOperationsFactory { - /** - * Return a FileOperations implementation based on which IOChannel would be used to write to a - * location specification (not necessarily a filename, as it may contain wildcards). - * - * <p>Only supports File and GCS locations (currently, the only factories registered with - * IOChannelUtils). For other locations, an exception is thrown. - */ - public static FileOperations getFileOperations(String spec, PipelineOptions options) - throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(spec); - if (factory instanceof GcsIOChannelFactory) { - return new GcsOperations(options); - } else if (factory instanceof FileIOChannelFactory) { - return new LocalFileOperations(); - } else { - throw new IOException("Unrecognized file system."); - } - } - } - - /** - * Copy and Remove operations for files. Operations behave like remove-if-existing and - * copy-if-existing and do not throw exceptions on file not found to enable retries of these - * operations in the case of transient error. - */ - private static interface FileOperations { - /** - * Copy a collection of files from one location to another. - * - * <p>The number of source filenames must equal the number of destination filenames. - * - * @param srcFilenames the source filenames. - * @param destFilenames the destination filenames. - */ - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException; - - /** - * Remove a collection of files. - */ - public void remove(Collection<String> filenames) throws IOException; - } - - /** - * GCS file system operations. - */ - private static class GcsOperations implements FileOperations { - private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class); - - /** - * Maximum number of requests permitted in a GCS batch request. - */ - private static final int MAX_REQUESTS_PER_BATCH = 1000; - - private ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - private GcsOptions gcsOptions; - private Storage gcs; - private BatchHelper batchHelper; - - public GcsOperations(PipelineOptions options) { - gcsOptions = options.as(GcsOptions.class); - gcs = Transport.newStorageClient(gcsOptions).build(); - batchHelper = - new BatchHelper(gcs.getRequestFactory().getInitializer(), gcs, MAX_REQUESTS_PER_BATCH); - } - - @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { - Preconditions.checkArgument( - srcFilenames.size() == destFilenames.size(), - String.format("Number of source files {} must equal number of destination files {}", - srcFilenames.size(), destFilenames.size())); - for (int i = 0; i < srcFilenames.size(); i++) { - final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); - final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); - LOG.debug("Copying {} to {}", sourcePath, destPath); - Storage.Objects.Copy copyObject = gcs.objects().copy(sourcePath.getBucket(), - sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null); - batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() { - @Override - public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully copied {} to {}", sourcePath, destPath); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", sourcePath); - } - }); - } - batchHelper.flush(); - } - - @Override - public void remove(Collection<String> filenames) throws IOException { - for (String filename : filenames) { - final GcsPath path = GcsPath.fromUri(filename); - LOG.debug("Removing: " + path); - Storage.Objects.Delete deleteObject = - gcs.objects().delete(path.getBucket(), path.getObject()); - batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException { - LOG.debug("Successfully removed {}", path); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", path); - } - }); - } - batchHelper.flush(); - } - } - - /** - * File systems supported by {@link Files}. - */ - private static class LocalFileOperations implements FileOperations { - private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class); - - @Override - public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { - Preconditions.checkArgument( - srcFilenames.size() == destFilenames.size(), - String.format("Number of source files {} must equal number of destination files {}", - srcFilenames.size(), destFilenames.size())); - int numFiles = srcFilenames.size(); - for (int i = 0; i < numFiles; i++) { - String src = srcFilenames.get(i); - String dst = destFilenames.get(i); - LOG.debug("Copying {} to {}", src, dst); - copyOne(src, dst); - } - } - - private void copyOne(String source, String destination) throws IOException { - try { - // Copy the source file, replacing the existing destination. - Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING); - } catch (NoSuchFileException e) { - LOG.debug("{} does not exist.", source); - // Suppress exception if file does not exist. - } - } - - @Override - public void remove(Collection<String> filenames) throws IOException { - for (String filename : filenames) { - LOG.debug("Removing file {}", filename); - removeOne(filename); - } - } - - private void removeOne(String filename) throws IOException { - // Delete the file if it exists. - boolean exists = Files.deleteIfExists(Paths.get(filename)); - if (!exists) { - LOG.debug("{} does not exist.", filename); - } - } - } - - /** - * BatchHelper abstracts out the logic for the maximum requests per batch for GCS. - * - * <p>Copy of - * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java - * - * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not - * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage - * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project - * and others may use the connector without introducing unnecessary dependencies. - * - * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical - * grouping of requests. - */ - @NotThreadSafe - private static class BatchHelper { - /** - * Callback that causes a single StorageRequest to be added to the BatchRequest. - */ - protected static interface QueueRequestCallback { - void enqueue() throws IOException; - } - - private final List<QueueRequestCallback> pendingBatchEntries; - private final BatchRequest batch; - - // Number of requests that can be queued into a single actual HTTP request - // before a sub-batch is sent. - private final long maxRequestsPerBatch; - - // Flag that indicates whether there is an in-progress flush. - private boolean flushing = false; - - /** - * Primary constructor, generally accessed only via the inner Factory class. - */ - public BatchHelper( - HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) { - this.pendingBatchEntries = new LinkedList<>(); - this.batch = gcs.batch(requestInitializer); - this.maxRequestsPerBatch = maxRequestsPerBatch; - } - - /** - * Adds an additional request to the batch, and possibly flushes the current contents of the - * batch if {@code maxRequestsPerBatch} has been reached. - */ - public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback) - throws IOException { - QueueRequestCallback queueCallback = new QueueRequestCallback() { - @Override - public void enqueue() throws IOException { - req.queue(batch, callback); - } - }; - pendingBatchEntries.add(queueCallback); - - flushIfPossibleAndRequired(); - } - - // Flush our buffer if we have more pending entries than maxRequestsPerBatch - private void flushIfPossibleAndRequired() throws IOException { - if (pendingBatchEntries.size() > maxRequestsPerBatch) { - flushIfPossible(); - } - } - - // Flush our buffer if we are not already in a flush operation and we have data to flush. - private void flushIfPossible() throws IOException { - if (!flushing && pendingBatchEntries.size() > 0) { - flushing = true; - try { - while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) { - QueueRequestCallback head = pendingBatchEntries.remove(0); - head.enqueue(); - } - - batch.execute(); - } finally { - flushing = false; - } - } - } - - - /** - * Sends any currently remaining requests in the batch; should be called at the end of any - * series of batched requests to ensure everything has been sent. - */ - public void flush() throws IOException { - flushIfPossible(); - } - } - - static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> { - @Override - public PCollection<T> apply(PCollection<T> input) { - return input - // TODO: This would need to be adapted to write per-window shards. - .apply(Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()) - .apply("RandomKey", ParDo.of( - new DoFn<T, KV<Long, T>>() { - transient long counter, step; - @Override - public void startBundle(Context c) { - counter = (long) (Math.random() * Long.MAX_VALUE); - step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE); - } - @Override - public void processElement(ProcessContext c) { - counter += step; - c.output(KV.of(counter, c.element())); - } - })) - .apply(GroupByKey.<Long, T>create()) - .apply("Ungroup", ParDo.of( - new DoFn<KV<Long, Iterable<T>>, T>() { - @Override - public void processElement(ProcessContext c) { - for (T item : c.element().getValue()) { - c.output(item); - } - } - })); - } - } -}
