This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d7eaaf07191 Duet AI code-explanation and code-generation prompts basic 
(#30686)
d7eaaf07191 is described below

commit d7eaaf07191ef0737df5c37daeb597424d5a865b
Author: Oleh Borysevych <[email protected]>
AuthorDate: Fri Mar 22 16:11:07 2024 +0100

    Duet AI code-explanation and code-generation prompts basic (#30686)
    
    * parquet added
    
    * csv and avro added
    
    * Apply suggestions from code review
    
    Co-authored-by: Daria Bezkorovaina 
<[email protected]>
    
    * whitespace fixed
    
    * Apply suggestions from code review
    
    Co-authored-by: Daria Bezkorovaina 
<[email protected]>
    
    ---------
    
    Co-authored-by: Daria Bezkorovaina 
<[email protected]>
---
 .../prompts/code-explanation/java/07_io_json.md    | 138 +++++++++++++++++++
 .../prompts/code-explanation/java/08_io_csv.md     | 138 +++++++++++++++++++
 .../prompts/code-explanation/java/09_io_avro.md    |  92 +++++++++++++
 .../prompts/code-explanation/java/10_io_parquet.md | 153 +++++++++++++++++++++
 .../prompts/code-generation/java/07_io_json.md     |  87 ++++++++++++
 learning/prompts/code-generation/java/08_io_csv.md |  78 +++++++++++
 .../prompts/code-generation/java/09_io_avro.md     |  68 +++++++++
 .../prompts/code-generation/java/10_io_parquet.md  | 130 +++++++++++++++++
 8 files changed, 884 insertions(+)

diff --git a/learning/prompts/code-explanation/java/07_io_json.md 
b/learning/prompts/code-explanation/java/07_io_json.md
new file mode 100644
index 00000000000..42a6c7a708a
--- /dev/null
+++ b/learning/prompts/code-explanation/java/07_io_json.md
@@ -0,0 +1,138 @@
+Prompt:
+What does this code do?
+
+```java
+package gcs;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.json.JsonIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class WriteJsonIO {
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class SampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount
+
+        public SampleRecord() {
+        }
+
+        public SampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+    }
+
+    public interface WriteJsonOptions extends PipelineOptions {
+
+        @Description("A file path to write JSON files to")
+        @Validation.Required
+        String getFilePath();
+
+        void setFilePath(String filePath);
+    }
+
+    public static void main(String[] args) {
+.
+        WriteJsonOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteJsonOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        List<SampleRecord> rows =
+            Arrays.asList(
+                new SampleRecord(1, "January", "$1000"),
+                new SampleRecord(2, "February", "$2000"),
+                new SampleRecord(3, "March", "$3000"));
+
+        p.apply("Create Records", Create.of(rows))
+            .apply(
+                "Write Records to JSON File",
+                    JsonIO.<SampleRecord>write(options.getFilePath())
+                        .withNumShards(1));
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam JsonIO connector to write a list of 
`SampleRecord` objects to a JSON file.
+
+The `SampleRecord` class defines the schema for the JSON records and is 
annotated with `@DefaultSchema(JavaFieldSchema.class)` to indicate that its 
fields should be used as the schema for the JSON records.
+
+```java
+ @DefaultSchema(JavaFieldSchema.class)
+    public static class SampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount
+
+        public SampleRecord() {
+        }
+
+        public SampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+    }
+```
+
+The `WriteJsonOptions` interface specifies command-line options for 
configuring the output file path. The `@Description` annotation provides a 
description of the option, and `@Validation.Required` indicates that the option 
is required.
+
+```java
+public interface WriteJsonOptions extends PipelineOptions {
+
+        @Description("A file path to write JSON files to")
+        @Validation.Required
+        String getFilePath();
+
+        void setFilePath(String filePath);
+    }
+```
+
+The `PipelineOptionsFactory` class generates `PipelineOptions` from 
command-line arguments. The `Pipeline.create` method creates a data pipeline, 
defined as a sequence of transformations to be applied to the data.
+
+```java
+  WriteJsonOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteJsonOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The pipeline generates a list of `SampleRecord` objects and applies the 
`Create` transform to create a `PCollection` from this list. Subsequently, the 
`JsonIO.write` transform is used to write the `PCollection` to a JSON file, 
with the `withNumShards` method specifying the number of output shards.
+
+```java
+        List<SampleRecord> rows =
+            Arrays.asList(
+                new SampleRecord(1, "January", "$1000"),
+                new SampleRecord(2, "February", "$2000"),
+                new SampleRecord(3, "March", "$3000"));
+
+        p.apply("Create Records", Create.of(rows))
+            .apply(
+                "Write Records to JSON File",
+                    JsonIO.<SampleRecord>write(options.getFilePath())
+                        .withNumShards(1));
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the 
pipeline.
+
+```java
+        p.run();
+```
diff --git a/learning/prompts/code-explanation/java/08_io_csv.md 
b/learning/prompts/code-explanation/java/08_io_csv.md
new file mode 100644
index 00000000000..567d82c0c10
--- /dev/null
+++ b/learning/prompts/code-explanation/java/08_io_csv.md
@@ -0,0 +1,138 @@
+Prompt:
+What does this code do?
+
+```java
+public class WriteCsvFile {
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class ExampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount;
+
+        public ExampleRecord() {
+        }
+
+        public ExampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount
+        }
+    }
+
+    public interface WriteCsvFileOptions extends PipelineOptions {
+
+        @Description("A file path to write CSV files to")
+        @Validation.Required
+        String getFilePath();
+
+        void setFilePath(String filePath);
+    }
+
+    public static void main(String[] args) {
+        WriteCsvFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteCsvFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        List<ExampleRecord> rows =
+            Arrays.asList(
+                new ExampleRecord(1, "January", "$1000"),
+                new ExampleRecord(2, "February", "$2000"),
+                new ExampleRecord(3, "March", "$3000"));
+
+        CSVFormat csvFormat =
+            CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache 
Beam")
+                .withCommentMarker('#');
+
+        p.apply("Create collection", Create.of(rows))
+            .apply(
+                "Write to CSV file",
+                    CsvIO.<ExampleRecord>write(options.getFilePathPrefix(), 
csvFormat)
+                        .withNumShards(1));
+        p.run();
+    }
+}
+
+```
+
+Response:
+This code snippet reads data from a CSV file using the `ReadFromCsv` transform 
from the CsvIO connector provided by Apache Beam.
+
+```java
+ @DefaultSchema(JavaFieldSchema.class)
+    public static class ExampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount;
+
+        public ExampleRecord() {
+        }
+
+        public ExampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount
+        }
+    }
+```
+
+The `ExampleRecord` class represents a row in the CSV file, defining three 
fields: `id`, `month`, and `amount`. The 
`@DefaultSchema(JavaFieldSchema.class)` annotation instructs the Apache Beam 
SDK to use the `JavaFieldSchema` class to generate the schema for the 
`ExampleRecord` class.
+
+```java
+public interface WriteCsvFileOptions extends PipelineOptions {
+
+        @Description("A file path to write CSV files to")
+        @Validation.Required
+        String getFilePath();
+
+        void setFilePath(String filePath);
+    }
+```
+
+The `WriteCsvFileOptions` interface defines a custom option for specifying the 
file path to write the CSV files to.
+
+```java
+  WriteCsvFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteCsvFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The `Pipeline.create` method creates a data pipeline using the options defined 
in the `WriteCsvFileOptions` class.
+
+```java
+   List<ExampleRecord> rows =
+            Arrays.asList(
+                new ExampleRecord(1, "January", "$1000"),
+                new ExampleRecord(2, "February", "$2000"),
+                new ExampleRecord(3, "March", "$3000"));
+```
+
+Subsequently, the code snippet creates a list of `ExampleRecord` objects to be 
written to the CSV file.
+
+```java
+        CSVFormat csvFormat =
+            CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache 
Beam")
+                .withCommentMarker('#');
+```
+
+To write the data to a CSV file, the pipeline creates a `CSVFormat` object 
with a header comment and a comment marker.
+
+```java
+        p.apply("Create collection", Create.of(rows))
+            .apply(
+                "Write to CSV file",
+                    CsvIO.<ExampleRecord>write(options.getFilePathPrefix(), 
csvFormat)
+                        .withNumShards(1));
+```
+
+The code applies the `Create` transform to generate a collection of 
`ExampleRecord` objects. Then, the `CsvIO.write` transform is applied to write 
the collection to a CSV file, with the `withNumShards` method specifying the 
number of shards to use when writing the file.
+
+```java
+        p.run();
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the 
pipeline.
diff --git a/learning/prompts/code-explanation/java/09_io_avro.md 
b/learning/prompts/code-explanation/java/09_io_avro.md
new file mode 100644
index 00000000000..569633f4167
--- /dev/null
+++ b/learning/prompts/code-explanation/java/09_io_avro.md
@@ -0,0 +1,92 @@
+Prompt:
+What does this code do?
+
+```java
+public class ReadGenericAvroIO {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadGenericAvroIO.class);
+
+    public interface ReadAvroOptions extends PipelineOptions {
+        @Description("Path to the Avro file to read from")
+        
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+        String getInput();
+
+        void setInput(String value);
+    }
+
+    public static void main(String[] args) {
+        ReadAvroOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, { 
\"name\": \"amount\", \"type\": \"string\" }]";
+        String schema = "{\"type\": \"record\", \"name\": 
\"AvroRecord\",\"fields\":" + fields + "}";
+        Schema avroSchema = Schema.parse(schema);
+
+        p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+         .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+                GenericRecord record = c.element();
+                String message = "Month: " + record.get("month") + ". Amount: 
" + record.get("amount");
+                LOG.info(message);
+                c.output(message);
+            }
+        }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code reads data from an Apache Avro file using the `ReadFromAvro` 
transform from the AvroIO connector provided by Apache Beam.
+
+```java
+    public interface ReadAvroOptions extends PipelineOptions {
+        @Description("Path to the Avro file to read from")
+        
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+        String getInput();
+
+        void setInput(String value);
+    }
+```
+
+The `ReadAvroOptions` interface defines the command-line argument `--input`, 
which specifies the path to the Avro file and sets the default value using the 
`@Default` annotation.
+
+```java
+        ReadAvroOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The `PipelineOptionsFactory` class parses the command-line arguments and 
creates a `ReadAvroOptions` instance. This instance is then used to create a 
pipeline with the provided pipeline options.
+
+```java
+        String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, { 
\"name\": \"amount\", \"type\": \"string\" }]";
+        String schema = "{\"type\": \"record\", \"name\": 
\"AvroRecord\",\"fields\":" + fields + "}";
+        Schema avroSchema = Schema.parse(schema);
+```
+
+The AvroIO connector requires a schema to read Avro files. Hence, the schema 
is defined as a string and parsed into a `Schema` object.
+
+```java
+        p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+         .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+                GenericRecord record = c.element();
+                String message = "Month: " + record.get("month") + ". Amount: 
" + record.get("amount");
+                LOG.info(message);
+                c.output(message);
+            }
+        }))
+```
+
+In this segment, the `ParDo` transform processes each `GenericRecord` object 
from the Avro file. Each `GenericRecord` object is then parsed into a string 
and logged accordingly.
+
+```java
+        p.run();
+```
+
+Finally, the pipeline is executed to read the Avro file using the AvroIO 
connector, parse the `GenericRecord` objects, format them, and output the 
results.
diff --git a/learning/prompts/code-explanation/java/10_io_parquet.md 
b/learning/prompts/code-explanation/java/10_io_parquet.md
new file mode 100644
index 00000000000..0e5e84e3e43
--- /dev/null
+++ b/learning/prompts/code-explanation/java/10_io_parquet.md
@@ -0,0 +1,153 @@
+Prompt:
+What does this code do?
+
+```java
+package parquet;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadParquetFile {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadParquetFile.class);
+
+    public static class ExampleRecord implements Serializable {
+        public static final String COLUMN_ID = "id";
+        public static final String COLUMN_MONTH = "month";
+        public static final String COLUMN_AMOUNT = "amount";
+
+        private int id;
+        private String month;
+        private String amount;
+    }
+
+    public interface ReadParquetFileOptions extends PipelineOptions {
+        @Description("A glob file pattern to read Parquet files from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadParquetFileOptions options =
+            
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadParquetFileOptions.class);
+
+        Schema exampleRecordSchema = 
ReflectData.get().getSchema(ExampleRecord.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from Parquet file",
+                ParquetIO.read(exampleRecordSchema)
+                    .withAvroDataModel(GenericData.get())
+                    .from(options.getFilePattern()))
+            .apply(
+                "Log records",
+                ParDo.of(
+                    new DoFn<GenericRecord, GenericRecord>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) {
+                            GenericRecord record = 
Objects.requireNonNull(c.element());
+                            LOG.info(
+                                "Id = {}, Name = {}, Amount = {}",
+                                record.get(ExampleRecord.COLUMN_ID),
+                                record.get(ExampleRecord.COLUMN_MONTH),
+                                record.get(ExampleRecord.COLUMN_AMOUNT));
+                            c.output(record);
+                        }
+                    }));
+        p.run();
+    }
+}
+```
+
+Response:
+This code reads data from an Apache Parquet file using the `ParquetIO.read` 
transform from the ParquetIO connector provided by Apache Beam.
+
+The `ExampleRecord` class defines the schema of the data in the Parquet file.
+
+```java
+    public static class ExampleRecord implements Serializable {
+        public static final String COLUMN_ID = "id";
+        public static final String COLUMN_MONTH = "month";
+        public static final String COLUMN_AMOUNT = "amount";
+
+        private int id;
+        private String month;
+        private String amount;
+    }
+```
+
+The `ReadParquetFileOptions` class defines the command-line argument `--path`, 
which specifies the path to the Parquet file. This code utilizes the pipeline 
options pattern for parsing command-line arguments.
+
+```java
+    public interface ReadParquetFileOptions extends PipelineOptions {
+        @Description("A glob file pattern to read Parquet files from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+```
+
+The `PipelineOptionsFactory` class parses the command-line arguments and 
instantiates a `PipelineOptions` object. The `ReflectData` class defines the 
`Schema` object representing the Avro schema of the `ExampleRecord class`. The 
code snippet creates a `Pipeline` object using the `PipelineOptions` object.
+
+```java
+        ReadParquetFileOptions options =
+            
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadParquetFileOptions.class);
+
+        Schema exampleRecordSchema = 
ReflectData.get().getSchema(ExampleRecord.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The pipeline employs the `ParquetIO.read` transform to extract data from the 
Parquet file. The `withAvroDataModel` method specifies that the data should be 
decoded using the Avro data model, and the `from` method specifies the path to 
the Parquet file.
+
+```java
+        p.apply(
+                "Read from Parquet file",
+                ParquetIO.read(exampleRecordSchema)
+                    .withAvroDataModel(GenericData.get())
+                    .from(options.getFilePattern()))
+```
+
+Subsequently, the `ParDo` transform logs the records read from the Parquet 
file. The `DoFn` processes each element and logs the values of the fields in 
the `ExampleRecord` class.
+
+```java
+            .apply(
+                "Log records",
+                ParDo.of(
+                    new DoFn<GenericRecord, GenericRecord>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) {
+                            GenericRecord record = 
Objects.requireNonNull(c.element());
+                            LOG.info(
+                                "Id = {}, Name = {}, Amount = {}",
+                                record.get(ExampleRecord.COLUMN_ID),
+                                record.get(ExampleRecord.COLUMN_MONTH),
+                                record.get(ExampleRecord.COLUMN_AMOUNT));
+                            c.output(record);
+                        }
+                    }));
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the 
pipeline.
+
+```java
+        p.run();
+```
\ No newline at end of file
diff --git a/learning/prompts/code-generation/java/07_io_json.md 
b/learning/prompts/code-generation/java/07_io_json.md
new file mode 100644
index 00000000000..91fb75ce837
--- /dev/null
+++ b/learning/prompts/code-generation/java/07_io_json.md
@@ -0,0 +1,87 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JSON file using Apache 
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JSON files using the Apache Beam 
JsonIO connector.
+
+Here is an example of how to use the JsonIO connector to accomplish this:
+
+```java
+package gcs;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.json.JsonIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline for writing data to JSON files using the `JsonIO.write()` 
transform.
+public class WriteJsonIO {
+
+    // Represents an Example JSON record.
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class SampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount
+
+        public SampleRecord() {
+        }
+
+        public SampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+    }
+
+    // Pipeline options for writing to JSON files.
+    public interface WriteJsonOptions extends PipelineOptions {
+
+        @Description("A file path to write JSON files to")
+        @Validation.Required
+        String getFilePath();
+
+        // Set the file path.
+        void setFilePath(String filePath);
+    }
+
+    // Main entry point.
+    public static void main(String[] args) {
+
+        // Get the pipeline options.
+        WriteJsonOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteJsonOptions.class);
+
+        // Create the pipeline.
+        Pipeline p = Pipeline.create(options);
+
+        // Create a list of SampleRecord objects.
+        List<SampleRecord> rows =
+            Arrays.asList(
+                new SampleRecord(1, "January", "$1000"),
+                new SampleRecord(2, "February", "$2000"),
+                new SampleRecord(3, "March", "$3000"));
+
+        // Write the records to JSON files
+        p.apply("Create Records", Create.of(rows))
+            .apply(
+                "Write Records to JSON File",
+                    JsonIO.<SampleRecord>write(options.getFilePath())
+                        .withNumShards(1));
+        // Run the pipeline.
+        p.run();
+    }
+}
+```
+
+This code snippet utilizes the pipeline options pattern to parse command-line 
arguments.
diff --git a/learning/prompts/code-generation/java/08_io_csv.md 
b/learning/prompts/code-generation/java/08_io_csv.md
new file mode 100644
index 00000000000..f87c82aa0b9
--- /dev/null
+++ b/learning/prompts/code-generation/java/08_io_csv.md
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data in CSV format using Apache 
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to CSV files using the CsvIO 
connector. Here is an example of how to achieve this:
+
+```java
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.csv.CsvIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class WriteCsvFile {
+
+    // ExampleRecord is a POJO that represents the data to be written to the 
CSV file
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class ExampleRecord implements Serializable {
+        public int id;
+        public String month;
+        public String amount;
+
+        public ExampleRecord() {
+        }
+
+        public ExampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+    }
+
+    public interface WriteCsvFileOptions extends PipelineOptions {
+
+        @Description("A file path to write CSV files to")
+        @Validation.Required
+        String getFilePath();
+
+        void setFilePath(String filePath);
+    }
+
+    public static void main(String[] args) {
+        WriteCsvFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(WriteCsvFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        List<ExampleRecord> rows =
+            Arrays.asList(
+                new ExampleRecord(1, "January", "$1000"),
+                new ExampleRecord(2, "February", "$2000"),
+                new ExampleRecord(3, "March", "$3000"));
+
+        CSVFormat csvFormat =
+            CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache 
Beam")
+                .withCommentMarker('#');
+
+        p.apply("Create collection", Create.of(rows))
+            .apply(
+                "Write to CSV file",
+                    CsvIO.<ExampleRecord>write()
+                        .to(options.getFilePath())
+                        .withNumShards(1));
+        p.run();
+    }
+}
+```
diff --git a/learning/prompts/code-generation/java/09_io_avro.md 
b/learning/prompts/code-generation/java/09_io_avro.md
new file mode 100644
index 00000000000..80c35034657
--- /dev/null
+++ b/learning/prompts/code-generation/java/09_io_avro.md
@@ -0,0 +1,68 @@
+Prompt:
+Write a sample Java code snippet that reads data from an Avro file using 
Apache Beam.
+
+Response:
+Your Apache Beam pipeline can read data from Apache Avro files using the 
AvroIO connector. Here is an example of how to achieve this:
+
+```java
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.avro.AvroIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadGenericAvroIO {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadGenericAvroIO.class);
+
+    // Define pipeline options for the pipeline
+    public interface ReadAvroOptions extends PipelineOptions {
+        @Description("Path to the Avro file to read from")
+        
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+        String getInput();
+
+        void setInput(String value);
+    }
+
+    public static void main(String[] args) {
+
+        // Create pipeline options to parse command-line arguments
+        ReadAvroOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+        // Create a pipeline
+        Pipeline p = Pipeline.create(options);
+
+        // Declare the schema of the Avro file
+        String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, { 
\"name\": \"amount\", \"type\": \"string\" }]";
+        String schema = "{\"type\": \"record\", \"name\": 
\"AvroRecord\",\"fields\":" + fields + "}";
+
+        // Parse the schema string into a Schema object
+        Schema avroSchema = new Schema.Parser().parse(schema);
+
+        // Read the Avro file using the AvroIO connector
+        p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+        // Parse the GenericRecord object into a string and log it
+         .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+                GenericRecord record = c.element();
+                String message = "Month: " + record.get("month") + ". Amount: 
" + record.get("amount");
+                LOG.info(message);
+                c.output(message);
+            }
+        }));
+
+        // Execute the pipeline
+        p.run();
+    }
+}
+```
+
+The `ReadAvroOptions` interface is used to define the command-line argument 
`--input`, which specifies the path to the Avro file. This code uses pipeline 
options to define the required argument.
diff --git a/learning/prompts/code-generation/java/10_io_parquet.md 
b/learning/prompts/code-generation/java/10_io_parquet.md
new file mode 100644
index 00000000000..45c3f2d0b7b
--- /dev/null
+++ b/learning/prompts/code-generation/java/10_io_parquet.md
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Parquet file using 
Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Apache Parquet files using the 
ParquetIO connector. Here is an example of how to achieve this:
+
+```java
+package parquet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+
+public class WriteSampleParquetFile {
+
+    // Represents a DTO to be written to Parquet.
+    public static class SampleRecord implements Serializable {
+        public static final String COLUMN_ID = "id";
+        public static final String COLUMN_MONTH = "month";
+        public static final String COLUMN_AMOUNT = "amount";
+
+        private int id;
+        private String month;
+        private String amount;
+
+        public SampleRecord() {}
+
+        public SampleRecord(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getMonth() {
+            return month;
+        }
+
+        public String getAmount() {
+            return amount;
+        }
+    }
+
+    // Pipeline options for writing to Parquet files.
+    public interface WriteSampleParquetFileOptions extends PipelineOptions {
+        @Description("A file path to write sample Parquet files to")
+        @Validation.Required
+        String getPath();
+
+        void setPath(String path);
+    }
+
+    public static void main(String[] args) {
+        WriteSampleParquetFileOptions options =
+            
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteSampleParquetFileOptions.class);
+
+        // Get the Avro schema for the SampleRecord object.
+        Schema sampleRecordSchema = 
ReflectData.get().getSchema(SampleRecord.class);
+
+        // Create a pipeline.
+        Pipeline p = Pipeline.create(options);
+
+        // Create a list of SampleRecord objects.
+        List<SampleRecord> rows =
+            Arrays.asList(
+                new SampleRecord(1, "January", "$1000"),
+                new SampleRecord(2, "February", "$2000"),
+                new SampleRecord(3, "March", "$3000"));
+        // Apply the Create transform to the pipeline to create a PCollection 
from the list of SampleRecord objects.
+        p.apply("Create", Create.of(rows))
+            // Apply the MapElements transform to the pipeline to map the 
SampleRecord objects to GenericRecord objects.
+            .apply(
+                "Map Sample record to GenericRecord",
+                MapElements.via(new 
MapSampleRecordToGenericRecord(sampleRecordSchema)))
+            // Set the coder for the GenericRecord objects to AvroCoder.
+            .setCoder(AvroCoder.of(sampleRecordSchema))
+            // Apply the FileIO.write() transform to the pipeline to write the 
GenericRecord objects to a Parquet file.
+            .apply(
+                "Write Parquet file",
+                FileIO.<GenericRecord>write()
+                    .via(ParquetIO.sink(sampleRecordSchema))
+                    .to(options.getPath()));
+        // Run the pipeline.
+        p.run();
+    }
+
+    // A SimpleFunction that maps SampleRecord objects to GenericRecord 
objects.
+    private static class MapSampleRecordToGenericRecord
+      extends SimpleFunction<SampleRecord, GenericRecord> {
+
+        private final Schema schema;
+
+        public MapSampleRecordToGenericRecord(Schema schema) {
+          this.schema = schema;
+        }
+
+        // Maps a SampleRecord object to a GenericRecord object.
+        @Override
+        public GenericRecord apply(SampleRecord input) {
+            GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+
+            builder
+                .set(SampleRecord.COLUMN_MONTH, input.getMonth())
+                .set(SampleRecord.COLUMN_AMOUNT, input.getAmount())
+                .set(SampleRecord.COLUMN_ID, input.getId());
+            return builder.build();
+        }
+    }
+}
+```
+
+The `WriteSampleParquetFileOptions` class is utilized to define the 
command-line argument `--path`, specifying the path where the Parquet file 
should be written. This code uses pipeline options to define the required 
`path` argument.
\ No newline at end of file


Reply via email to