pabloem commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r589778288



##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read 
data from one of
+the supported sources, tokenize data with external API calls to remote RPC 
server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that 
will read data in CSV or
+JSON format from a specified input source, send the data to an external 
processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task 
to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute 
-DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization 
\
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+

Review comment:
       these instructions are nice and useful. I worry that users will not find 
out about this example. Do you have plans to blog about it, or add any extra 
documentation for it?

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import 
org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import 
org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.util.RowJsonUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonArray;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DataProtectors} Using passing parameters transform will buffer 
input rows in batch and
+ * will send it when the count of buffered rows will equal specified batch 
size. When it takes the
+ * last one batch, it will send it when the last row will come to doFn even 
count of buffered rows
+ * will less than the batch size.
+ */
+public class DataProtectors {
+
+  /** Logger for class. */
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataProtectors.class);
+
+  public static final String ID_FIELD_NAME = "ID";
+
+  /**
+   * The {@link RowToTokenizedRow} transform converts {@link Row} to {@link 
TableRow} objects. The
+   * transform accepts a {@link FailsafeElement} object so the original 
payload of the incoming
+   * record can be maintained across multiple series of transforms.
+   */
+  @AutoValue
+  public abstract static class RowToTokenizedRow<T>
+      extends PTransform<PCollection<KV<Integer, Row>>, PCollectionTuple> {
+
+    public static <T> Builder<T> newBuilder() {
+      return new AutoValue_DataProtectors_RowToTokenizedRow.Builder<>();
+    }
+
+    public abstract TupleTag<Row> successTag();
+
+    public abstract TupleTag<FailsafeElement<Row, Row>> failureTag();
+
+    public abstract Schema schema();
+
+    public abstract int batchSize();
+
+    public abstract String rpcURI();
+
+    @Override
+    public PCollectionTuple expand(PCollection<KV<Integer, Row>> inputRows) {
+      FailsafeElementCoder<Row, Row> coder =
+          FailsafeElementCoder.of(RowCoder.of(schema()), 
RowCoder.of(schema()));
+      PCollectionTuple pCollectionTuple =
+          inputRows.apply(
+              "Tokenize",
+              ParDo.of(new TokenizationFn(schema(), batchSize(), rpcURI(), 
failureTag()))
+                  .withOutputTags(successTag(), 
TupleTagList.of(failureTag())));
+      return PCollectionTuple.of(
+              successTag(), 
pCollectionTuple.get(successTag()).setRowSchema(schema()))
+          .and(failureTag(), 
pCollectionTuple.get(failureTag()).setCoder(coder));
+    }
+
+    /** Builder for {@link RowToTokenizedRow}. */
+    @AutoValue.Builder
+    public abstract static class Builder<T> {
+
+      public abstract Builder<T> setSuccessTag(TupleTag<Row> successTag);
+
+      public abstract Builder<T> setFailureTag(TupleTag<FailsafeElement<Row, 
Row>> failureTag);
+
+      public abstract Builder<T> setSchema(Schema schema);
+
+      public abstract Builder<T> setBatchSize(int batchSize);
+
+      public abstract Builder<T> setRpcURI(String rpcURI);
+
+      public abstract RowToTokenizedRow<T> build();
+    }
+  }
+
+  /** Class implements stateful doFn for data tokenization using remote RPC. */
+  @SuppressWarnings("initialization.static.fields.uninitialized")
+  public static class TokenizationFn extends DoFn<KV<Integer, Row>, Row> {
+
+    private static Schema schemaToRpc;
+    private static CloseableHttpClient httpclient;
+    private static ObjectMapper objectMapperSerializerForSchema;
+    private static ObjectMapper objectMapperDeserializerForSchema;
+
+    private final Schema schema;
+    private final int batchSize;
+    private final String rpcURI;
+    private final TupleTag<FailsafeElement<Row, Row>> failureTag;
+
+    @StateId("buffer")
+    private final StateSpec<BagState<Row>> bufferedEvents;
+
+    @StateId("count")
+    private final StateSpec<ValueState<Integer>> countState = 
StateSpecs.value();
+
+    @TimerId("expiry")
+    private final TimerSpec expirySpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private Map<String, Row> inputRowsWithIds;
+
+    public TokenizationFn(
+        Schema schema,
+        int batchSize,
+        String rpcURI,
+        TupleTag<FailsafeElement<Row, Row>> failureTag) {
+      this.schema = schema;
+      this.batchSize = batchSize;
+      this.rpcURI = rpcURI;
+      bufferedEvents = StateSpecs.bag(RowCoder.of(schema));
+      this.failureTag = failureTag;
+      this.inputRowsWithIds = new HashMap<>();
+    }
+
+    @Setup
+    public void setup() {
+
+      List<Field> fields = schema.getFields();
+      fields.add(Field.of(ID_FIELD_NAME, FieldType.STRING));
+      schemaToRpc = new Schema(fields);
+
+      objectMapperSerializerForSchema =
+          
RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schemaToRpc));
+
+      objectMapperDeserializerForSchema =
+          
RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schemaToRpc));
+
+      httpclient = HttpClients.createDefault();
+    }
+
+    @Teardown
+    public void close() {
+      try {
+        httpclient.close();
+      } catch (IOException exception) {
+        String exceptionMessage = exception.getMessage();
+        if (exceptionMessage != null) {
+          LOG.warn("Can't close connection: {}", exceptionMessage);
+        }
+      }
+    }
+
+    @OnTimer("expiry")
+    public void onExpiry(OnTimerContext context, @StateId("buffer") 
BagState<Row> bufferState) {
+      boolean isEmpty = firstNonNull(bufferState.isEmpty().read(), true);
+      if (!isEmpty) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        BoundedWindow window,
+        @StateId("buffer") BagState<Row> bufferState,
+        @StateId("count") ValueState<Integer> countState,
+        @TimerId("expiry") Timer expiryTimer) {
+
+      expiryTimer.set(window.maxTimestamp());
+
+      int count = firstNonNull(countState.read(), 0);
+      count++;
+      countState.write(count);
+      bufferState.add(context.element().getValue());
+
+      if (count >= batchSize) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+        countState.clear();
+      }
+    }
+
+    @SuppressWarnings("argument.type.incompatible")
+    private void processBufferedRows(Iterable<Row> rows, WindowedContext 
context) {

Review comment:
       I see that this DoFn does a lot of its own buffering. Have you 
considered using GroupIntoBatches[1] for this? GroupIntoBatches has the same 
sort of buffering/counting/timer emission logic, but it receives more updates 
(e.g. it recently started supporting 'autosharding' which lets runners decouple 
the number of shards from the number of keys).
   
   Think about it - but I recommend you try using GroupIntoBatches, as you will 
get some extra nice benefits from it.
   
   [1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/GroupIntoBatches.html

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/FileSystemIO.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import 
org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.examples.complete.datatokenization.utils.CsvConverters;
+import 
org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
+import 
org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import 
org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link FileSystemIO} class to read/write data from/into File Systems. 
*/
+public class FileSystemIO {

Review comment:
       is this class meant to be generic? Or specific for this template? I see 
the class is within the template package - I am just wondering if we should 
name the class TokenizationFileIO or something that states clearly that these 
transforms are meant only to be used for the data tokenization template?

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigQueryIO.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import 
org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigQueryIO} class for writing data from template to BigTable. */
+public class BigQueryIO {

Review comment:
       Same as FileSystemIO

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read 
data from one of
+the supported sources, tokenize data with external API calls to remote RPC 
server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that 
will read data in CSV or
+JSON format from a specified input source, send the data to an external 
processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task 
to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute 
-DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization 
\
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+    - **dataSchemaPath**: Path to data schema (JSON format) compatible with 
BigQuery.
+- 1 specified input source out of these:
+    - File System
+        - **inputFilePattern**: Filepattern for files to read data from
+        - **inputFileFormat**: File format of input files. Supported formats: 
JSON, CSV
+        - In case if input data is in CSV format:
+            - **csvContainsHeaders**: `true` if file(s) in bucket to read data 
from contain headers,
+              and `false` otherwise
+            - **csvDelimiter**: Delimiting character in CSV. Default: use 
delimiter provided in
+              csvFormat
+            - **csvFormat**: Csv format according to Apache Commons CSV 
format. Default is:
+              [Apache Commons CSV 
default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+              . Must match format names exactly found
+              at: 
https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+    - Google Pub/Sub
+        - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format 
of '
+          projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+    - File System
+        - **outputDirectory**: Directory to write data to
+        - **outputFileFormat**: File format of output files. Supported 
formats: JSON, CSV
+        - **windowDuration**: The window duration in which data will be 
written. Should be specified
+          only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+          Allowed formats are:
+            - Ns (for seconds, example: 5s),
+            - Nm (for minutes, example: 12m),
+            - Nh (for hours, example: 2h).
+    - Google Cloud BigQuery
+        - **bigQueryTableName**: Cloud BigQuery table name to write into
+        - **tempLocation**: Folder in a Google Cloud Storage bucket, which is 
needed for
+          BigQuery to handle data writing
+    - Cloud BigTable
+        - **bigTableProjectId**: Id of the project where the Cloud BigTable 
instance to write into
+          is located
+        - **bigTableInstanceId**: Id of the Cloud BigTable instance to write 
into
+        - **bigTableTableId**: Id of the Cloud BigTable table to write into
+        - **bigTableKeyColumnName**: Column name to use as a key in Cloud 
BigTable
+        - **bigTableColumnFamilyName**: Column family name to use in Cloud 
BigTable
+- RPC server parameters
+    - **rpcUri**: URI for the API calls to RPC server
+    - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be 
stored
+
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To 
change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more 
information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and 
run using Google Cloud Platform. See
+this template documentation 
[README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md)
 for

Review comment:
       I don't see anything in this address. Will it be added later?

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read 
data from one of
+the supported sources, tokenize data with external API calls to remote RPC 
server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that 
will read data in CSV or
+JSON format from a specified input source, send the data to an external 
processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task 
to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute 
-DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization 
\
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+    - **dataSchemaPath**: Path to data schema (JSON format) compatible with 
BigQuery.
+- 1 specified input source out of these:
+    - File System
+        - **inputFilePattern**: Filepattern for files to read data from
+        - **inputFileFormat**: File format of input files. Supported formats: 
JSON, CSV
+        - In case if input data is in CSV format:
+            - **csvContainsHeaders**: `true` if file(s) in bucket to read data 
from contain headers,
+              and `false` otherwise
+            - **csvDelimiter**: Delimiting character in CSV. Default: use 
delimiter provided in
+              csvFormat
+            - **csvFormat**: Csv format according to Apache Commons CSV 
format. Default is:
+              [Apache Commons CSV 
default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+              . Must match format names exactly found
+              at: 
https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+    - Google Pub/Sub
+        - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format 
of '
+          projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+    - File System
+        - **outputDirectory**: Directory to write data to
+        - **outputFileFormat**: File format of output files. Supported 
formats: JSON, CSV
+        - **windowDuration**: The window duration in which data will be 
written. Should be specified
+          only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+          Allowed formats are:
+            - Ns (for seconds, example: 5s),
+            - Nm (for minutes, example: 12m),
+            - Nh (for hours, example: 2h).
+    - Google Cloud BigQuery
+        - **bigQueryTableName**: Cloud BigQuery table name to write into
+        - **tempLocation**: Folder in a Google Cloud Storage bucket, which is 
needed for
+          BigQuery to handle data writing
+    - Cloud BigTable
+        - **bigTableProjectId**: Id of the project where the Cloud BigTable 
instance to write into
+          is located
+        - **bigTableInstanceId**: Id of the Cloud BigTable instance to write 
into
+        - **bigTableTableId**: Id of the Cloud BigTable table to write into
+        - **bigTableKeyColumnName**: Column name to use as a key in Cloud 
BigTable
+        - **bigTableColumnFamilyName**: Column family name to use in Cloud 
BigTable
+- RPC server parameters
+    - **rpcUri**: URI for the API calls to RPC server
+    - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be 
stored
+
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To 
change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more 
information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and 
run using Google Cloud Platform. See
+this template documentation 
[README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md)
 for

Review comment:
       I am curious how the template here and in DataflowTemplates will be 
different?

##########
File path: 
examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigTableIO.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import 
org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteResult;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigTableIO} class for writing data from template to BigTable. */
+public class BigTableIO {

Review comment:
       Same question as with FileSystemIO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to