dariabezkorovaina commented on code in PR #30749:
URL: https://github.com/apache/beam/pull/30749#discussion_r1542034794
##########
learning/prompts/code-generation/java/04_io_bigtable.md:
##########
@@ -0,0 +1,132 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Google BigTable table
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JSON files using the Apache Beam
BigTableIO connector.
+Here is an example of how to use the BigTableIO connector to accomplish this:
+
+```java
+ package bigtable;
+
+ import com.google.bigtable.v2.Mutation;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.primitives.Ints;
+ import com.google.protobuf.ByteString;
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+ import org.apache.beam.sdk.options.Default;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.Create;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+ import org.apache.beam.sdk.values.KV;
+ import org.checkerframework.checker.nullness.qual.Nullable;
+ import org.joda.time.Instant;
+
+ import java.math.BigInteger;
+ import java.util.stream.IntStream;
+
+ // Pipeline to write data to BigTable table
+ public class BigTableWriteTable {
+
+ // Pipeline options for writing to BigTable table
+ public interface BigTableWriteTableOptions extends
DataflowPipelineOptions {
+ @Description("BigTable Instance ID")
+ @Default.String("bigtable-instance")
+ String getInstanceId();
+
+ void setInstanceId(String value);
+
+ @Description("BigTable Table Name")
+ @Default.String("bigtable-table")
+ String getTableName();
+
+ void setTableName(String value);
+
+ @Nullable
+ @Description("BigTable Project ID")
+ String getBigTableProject();
+
+ void setBigTableProject(String value);
+ }
+
+ public static void main(String[] args) {
+
+ // Parse the pipeline options from the command line.
+ BigTableWriteTableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigTableWriteTableOptions.class);
+
+ // Set the project ID to the BigTable project ID if it is not null,
otherwise set it to the project ID
+ String project = (options.getBigTableProject() == null) ?
options.getProject() : options.getBigTableProject();
+
+ // Create the pipeline
+ Pipeline p = Pipeline.create(options);
+
+ // Sample data to write to BigTable
+ int[] rangeIntegers = IntStream.range(2, 1000).toArray();
+ Iterable<Integer> elements = Ints.asList(rangeIntegers);
+
+ p
+ // Create a PCollection from the sample data
+ .apply(Create.of(elements))
+ // Apply a ParDo transformation to create mutations
+ .apply("Group in Mutations", ParDo.of(new CreateMutations()))
+ // Write the mutations to BigTable table
+ .apply(BigtableIO.write()
+ .withInstanceId(options.getInstanceId())
+ .withProjectId(project)
+ .withTableId(options.getTableName()));
+
+ // Execute the pipeline.
+ p.run();
+ }
+
+ // Class to create mutations for each element in the PCollection
+ public static class CreateMutations extends DoFn<Integer, KV<ByteString,
Iterable<Mutation>>> {
+ public ImmutableList.Builder<Mutation> mutations;
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+
+ BigInteger b = new BigInteger(c.element().toString());
+ String isPrime = b.isProbablePrime(1) ? "Prime" : "Composite";
+
+ // Set the cell value for the element
+ Mutation.SetCell setCell =
+ Mutation.SetCell.newBuilder()
+ // Set the family name to the prime status
+ .setFamilyName(isPrime)
+ // Set the column qualifier to the element value
+ .setColumnQualifier(toByteString(c.element().toString()))
+ // Set the cell value to the element value
+ .setValue(toByteString("value-" + c.element()))
+ // Set the timestamp to the current time
+ .setTimestampMicros(Instant.now().getMillis() * 1000)
+ .build();
+ // Add the mutation to the list of mutations
+
this.mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
+ }
+
+ // Initialize the list of mutations
+ @StartBundle
+ public void startBundle() {
+ this.mutations = ImmutableList.builder();
+ }
+
+ // Finish the bundle and output the mutations
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) {
+ KV<ByteString, Iterable<Mutation>> row =
KV.of(toByteString("numbers"), this.mutations.build());
+ c.output(row, Instant.now(), GlobalWindow.INSTANCE);
+ }
+ // Convert a string to a ByteString
+ private static ByteString toByteString(String value) {
+ return ByteString.copyFromUtf8(value);
+ }
+ }
+ }
+
+```
+This code snippet utilizes the pipeline options pattern to parse command-line
arguments.
Review Comment:
```suggestion
This code snippet utilizes the pipeline options pattern to parse
command-line arguments.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]