This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d7eaaf07191 Duet AI code-explanation and code-generation prompts basic
(#30686)
d7eaaf07191 is described below
commit d7eaaf07191ef0737df5c37daeb597424d5a865b
Author: Oleh Borysevych <[email protected]>
AuthorDate: Fri Mar 22 16:11:07 2024 +0100
Duet AI code-explanation and code-generation prompts basic (#30686)
* parquet added
* csv and avro added
* Apply suggestions from code review
Co-authored-by: Daria Bezkorovaina
<[email protected]>
* whitespace fixed
* Apply suggestions from code review
Co-authored-by: Daria Bezkorovaina
<[email protected]>
---------
Co-authored-by: Daria Bezkorovaina
<[email protected]>
---
.../prompts/code-explanation/java/07_io_json.md | 138 +++++++++++++++++++
.../prompts/code-explanation/java/08_io_csv.md | 138 +++++++++++++++++++
.../prompts/code-explanation/java/09_io_avro.md | 92 +++++++++++++
.../prompts/code-explanation/java/10_io_parquet.md | 153 +++++++++++++++++++++
.../prompts/code-generation/java/07_io_json.md | 87 ++++++++++++
learning/prompts/code-generation/java/08_io_csv.md | 78 +++++++++++
.../prompts/code-generation/java/09_io_avro.md | 68 +++++++++
.../prompts/code-generation/java/10_io_parquet.md | 130 +++++++++++++++++
8 files changed, 884 insertions(+)
diff --git a/learning/prompts/code-explanation/java/07_io_json.md
b/learning/prompts/code-explanation/java/07_io_json.md
new file mode 100644
index 00000000000..42a6c7a708a
--- /dev/null
+++ b/learning/prompts/code-explanation/java/07_io_json.md
@@ -0,0 +1,138 @@
+Prompt:
+What does this code do?
+
+```java
+package gcs;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.json.JsonIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class WriteJsonIO {
+
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class SampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount
+
+ public SampleRecord() {
+ }
+
+ public SampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+ }
+
+ public interface WriteJsonOptions extends PipelineOptions {
+
+ @Description("A file path to write JSON files to")
+ @Validation.Required
+ String getFilePath();
+
+ void setFilePath(String filePath);
+ }
+
+ public static void main(String[] args) {
+.
+ WriteJsonOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteJsonOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ List<SampleRecord> rows =
+ Arrays.asList(
+ new SampleRecord(1, "January", "$1000"),
+ new SampleRecord(2, "February", "$2000"),
+ new SampleRecord(3, "March", "$3000"));
+
+ p.apply("Create Records", Create.of(rows))
+ .apply(
+ "Write Records to JSON File",
+ JsonIO.<SampleRecord>write(options.getFilePath())
+ .withNumShards(1));
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam JsonIO connector to write a list of
`SampleRecord` objects to a JSON file.
+
+The `SampleRecord` class defines the schema for the JSON records and is
annotated with `@DefaultSchema(JavaFieldSchema.class)` to indicate that its
fields should be used as the schema for the JSON records.
+
+```java
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class SampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount
+
+ public SampleRecord() {
+ }
+
+ public SampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+ }
+```
+
+The `WriteJsonOptions` interface specifies command-line options for
configuring the output file path. The `@Description` annotation provides a
description of the option, and `@Validation.Required` indicates that the option
is required.
+
+```java
+public interface WriteJsonOptions extends PipelineOptions {
+
+ @Description("A file path to write JSON files to")
+ @Validation.Required
+ String getFilePath();
+
+ void setFilePath(String filePath);
+ }
+```
+
+The `PipelineOptionsFactory` class generates `PipelineOptions` from
command-line arguments. The `Pipeline.create` method creates a data pipeline,
defined as a sequence of transformations to be applied to the data.
+
+```java
+ WriteJsonOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteJsonOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The pipeline generates a list of `SampleRecord` objects and applies the
`Create` transform to create a `PCollection` from this list. Subsequently, the
`JsonIO.write` transform is used to write the `PCollection` to a JSON file,
with the `withNumShards` method specifying the number of output shards.
+
+```java
+ List<SampleRecord> rows =
+ Arrays.asList(
+ new SampleRecord(1, "January", "$1000"),
+ new SampleRecord(2, "February", "$2000"),
+ new SampleRecord(3, "March", "$3000"));
+
+ p.apply("Create Records", Create.of(rows))
+ .apply(
+ "Write Records to JSON File",
+ JsonIO.<SampleRecord>write(options.getFilePath())
+ .withNumShards(1));
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the
pipeline.
+
+```java
+ p.run();
+```
diff --git a/learning/prompts/code-explanation/java/08_io_csv.md
b/learning/prompts/code-explanation/java/08_io_csv.md
new file mode 100644
index 00000000000..567d82c0c10
--- /dev/null
+++ b/learning/prompts/code-explanation/java/08_io_csv.md
@@ -0,0 +1,138 @@
+Prompt:
+What does this code do?
+
+```java
+public class WriteCsvFile {
+
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class ExampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount;
+
+ public ExampleRecord() {
+ }
+
+ public ExampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount
+ }
+ }
+
+ public interface WriteCsvFileOptions extends PipelineOptions {
+
+ @Description("A file path to write CSV files to")
+ @Validation.Required
+ String getFilePath();
+
+ void setFilePath(String filePath);
+ }
+
+ public static void main(String[] args) {
+ WriteCsvFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteCsvFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ List<ExampleRecord> rows =
+ Arrays.asList(
+ new ExampleRecord(1, "January", "$1000"),
+ new ExampleRecord(2, "February", "$2000"),
+ new ExampleRecord(3, "March", "$3000"));
+
+ CSVFormat csvFormat =
+ CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache
Beam")
+ .withCommentMarker('#');
+
+ p.apply("Create collection", Create.of(rows))
+ .apply(
+ "Write to CSV file",
+ CsvIO.<ExampleRecord>write(options.getFilePathPrefix(),
csvFormat)
+ .withNumShards(1));
+ p.run();
+ }
+}
+
+```
+
+Response:
+This code snippet reads data from a CSV file using the `ReadFromCsv` transform
from the CsvIO connector provided by Apache Beam.
+
+```java
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class ExampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount;
+
+ public ExampleRecord() {
+ }
+
+ public ExampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount
+ }
+ }
+```
+
+The `ExampleRecord` class represents a row in the CSV file, defining three
fields: `id`, `month`, and `amount`. The
`@DefaultSchema(JavaFieldSchema.class)` annotation instructs the Apache Beam
SDK to use the `JavaFieldSchema` class to generate the schema for the
`ExampleRecord` class.
+
+```java
+public interface WriteCsvFileOptions extends PipelineOptions {
+
+ @Description("A file path to write CSV files to")
+ @Validation.Required
+ String getFilePath();
+
+ void setFilePath(String filePath);
+ }
+```
+
+The `WriteCsvFileOptions` interface defines a custom option for specifying the
file path to write the CSV files to.
+
+```java
+ WriteCsvFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteCsvFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `Pipeline.create` method creates a data pipeline using the options defined
in the `WriteCsvFileOptions` class.
+
+```java
+ List<ExampleRecord> rows =
+ Arrays.asList(
+ new ExampleRecord(1, "January", "$1000"),
+ new ExampleRecord(2, "February", "$2000"),
+ new ExampleRecord(3, "March", "$3000"));
+```
+
+Subsequently, the code snippet creates a list of `ExampleRecord` objects to be
written to the CSV file.
+
+```java
+ CSVFormat csvFormat =
+ CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache
Beam")
+ .withCommentMarker('#');
+```
+
+To write the data to a CSV file, the pipeline creates a `CSVFormat` object
with a header comment and a comment marker.
+
+```java
+ p.apply("Create collection", Create.of(rows))
+ .apply(
+ "Write to CSV file",
+ CsvIO.<ExampleRecord>write(options.getFilePathPrefix(),
csvFormat)
+ .withNumShards(1));
+```
+
+The code applies the `Create` transform to generate a collection of
`ExampleRecord` objects. Then, the `CsvIO.write` transform is applied to write
the collection to a CSV file, with the `withNumShards` method specifying the
number of shards to use when writing the file.
+
+```java
+ p.run();
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the
pipeline.
diff --git a/learning/prompts/code-explanation/java/09_io_avro.md
b/learning/prompts/code-explanation/java/09_io_avro.md
new file mode 100644
index 00000000000..569633f4167
--- /dev/null
+++ b/learning/prompts/code-explanation/java/09_io_avro.md
@@ -0,0 +1,92 @@
+Prompt:
+What does this code do?
+
+```java
+public class ReadGenericAvroIO {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadGenericAvroIO.class);
+
+ public interface ReadAvroOptions extends PipelineOptions {
+ @Description("Path to the Avro file to read from")
+
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+ String getInput();
+
+ void setInput(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, {
\"name\": \"amount\", \"type\": \"string\" }]";
+ String schema = "{\"type\": \"record\", \"name\":
\"AvroRecord\",\"fields\":" + fields + "}";
+ Schema avroSchema = Schema.parse(schema);
+
+ p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+ .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord record = c.element();
+ String message = "Month: " + record.get("month") + ". Amount:
" + record.get("amount");
+ LOG.info(message);
+ c.output(message);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code reads data from an Apache Avro file using the `ReadFromAvro`
transform from the AvroIO connector provided by Apache Beam.
+
+```java
+ public interface ReadAvroOptions extends PipelineOptions {
+ @Description("Path to the Avro file to read from")
+
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+ String getInput();
+
+ void setInput(String value);
+ }
+```
+
+The `ReadAvroOptions` interface defines the command-line argument `--input`,
which specifies the path to the Avro file and sets the default value using the
`@Default` annotation.
+
+```java
+ ReadAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `PipelineOptionsFactory` class parses the command-line arguments and
creates a `ReadAvroOptions` instance. This instance is then used to create a
pipeline with the provided pipeline options.
+
+```java
+ String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, {
\"name\": \"amount\", \"type\": \"string\" }]";
+ String schema = "{\"type\": \"record\", \"name\":
\"AvroRecord\",\"fields\":" + fields + "}";
+ Schema avroSchema = Schema.parse(schema);
+```
+
+The AvroIO connector requires a schema to read Avro files. Hence, the schema
is defined as a string and parsed into a `Schema` object.
+
+```java
+ p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+ .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord record = c.element();
+ String message = "Month: " + record.get("month") + ". Amount:
" + record.get("amount");
+ LOG.info(message);
+ c.output(message);
+ }
+ }))
+```
+
+In this segment, the `ParDo` transform processes each `GenericRecord` object
from the Avro file. Each `GenericRecord` object is then parsed into a string
and logged accordingly.
+
+```java
+ p.run();
+```
+
+Finally, the pipeline is executed to read the Avro file using the AvroIO
connector, parse the `GenericRecord` objects, format them, and output the
results.
diff --git a/learning/prompts/code-explanation/java/10_io_parquet.md
b/learning/prompts/code-explanation/java/10_io_parquet.md
new file mode 100644
index 00000000000..0e5e84e3e43
--- /dev/null
+++ b/learning/prompts/code-explanation/java/10_io_parquet.md
@@ -0,0 +1,153 @@
+Prompt:
+What does this code do?
+
+```java
+package parquet;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadParquetFile {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadParquetFile.class);
+
+ public static class ExampleRecord implements Serializable {
+ public static final String COLUMN_ID = "id";
+ public static final String COLUMN_MONTH = "month";
+ public static final String COLUMN_AMOUNT = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+ }
+
+ public interface ReadParquetFileOptions extends PipelineOptions {
+ @Description("A glob file pattern to read Parquet files from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadParquetFileOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadParquetFileOptions.class);
+
+ Schema exampleRecordSchema =
ReflectData.get().getSchema(ExampleRecord.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Parquet file",
+ ParquetIO.read(exampleRecordSchema)
+ .withAvroDataModel(GenericData.get())
+ .from(options.getFilePattern()))
+ .apply(
+ "Log records",
+ ParDo.of(
+ new DoFn<GenericRecord, GenericRecord>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord record =
Objects.requireNonNull(c.element());
+ LOG.info(
+ "Id = {}, Name = {}, Amount = {}",
+ record.get(ExampleRecord.COLUMN_ID),
+ record.get(ExampleRecord.COLUMN_MONTH),
+ record.get(ExampleRecord.COLUMN_AMOUNT));
+ c.output(record);
+ }
+ }));
+ p.run();
+ }
+}
+```
+
+Response:
+This code reads data from an Apache Parquet file using the `ParquetIO.read`
transform from the ParquetIO connector provided by Apache Beam.
+
+The `ExampleRecord` class defines the schema of the data in the Parquet file.
+
+```java
+ public static class ExampleRecord implements Serializable {
+ public static final String COLUMN_ID = "id";
+ public static final String COLUMN_MONTH = "month";
+ public static final String COLUMN_AMOUNT = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+ }
+```
+
+The `ReadParquetFileOptions` class defines the command-line argument `--path`,
which specifies the path to the Parquet file. This code utilizes the pipeline
options pattern for parsing command-line arguments.
+
+```java
+ public interface ReadParquetFileOptions extends PipelineOptions {
+ @Description("A glob file pattern to read Parquet files from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+```
+
+The `PipelineOptionsFactory` class parses the command-line arguments and
instantiates a `PipelineOptions` object. The `ReflectData` class defines the
`Schema` object representing the Avro schema of the `ExampleRecord class`. The
code snippet creates a `Pipeline` object using the `PipelineOptions` object.
+
+```java
+ ReadParquetFileOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadParquetFileOptions.class);
+
+ Schema exampleRecordSchema =
ReflectData.get().getSchema(ExampleRecord.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The pipeline employs the `ParquetIO.read` transform to extract data from the
Parquet file. The `withAvroDataModel` method specifies that the data should be
decoded using the Avro data model, and the `from` method specifies the path to
the Parquet file.
+
+```java
+ p.apply(
+ "Read from Parquet file",
+ ParquetIO.read(exampleRecordSchema)
+ .withAvroDataModel(GenericData.get())
+ .from(options.getFilePattern()))
+```
+
+Subsequently, the `ParDo` transform logs the records read from the Parquet
file. The `DoFn` processes each element and logs the values of the fields in
the `ExampleRecord` class.
+
+```java
+ .apply(
+ "Log records",
+ ParDo.of(
+ new DoFn<GenericRecord, GenericRecord>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord record =
Objects.requireNonNull(c.element());
+ LOG.info(
+ "Id = {}, Name = {}, Amount = {}",
+ record.get(ExampleRecord.COLUMN_ID),
+ record.get(ExampleRecord.COLUMN_MONTH),
+ record.get(ExampleRecord.COLUMN_AMOUNT));
+ c.output(record);
+ }
+ }));
+```
+
+Finally, the code snippet invokes the `Pipeline.run` method to execute the
pipeline.
+
+```java
+ p.run();
+```
\ No newline at end of file
diff --git a/learning/prompts/code-generation/java/07_io_json.md
b/learning/prompts/code-generation/java/07_io_json.md
new file mode 100644
index 00000000000..91fb75ce837
--- /dev/null
+++ b/learning/prompts/code-generation/java/07_io_json.md
@@ -0,0 +1,87 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JSON file using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JSON files using the Apache Beam
JsonIO connector.
+
+Here is an example of how to use the JsonIO connector to accomplish this:
+
+```java
+package gcs;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.json.JsonIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline for writing data to JSON files using the `JsonIO.write()`
transform.
+public class WriteJsonIO {
+
+ // Represents an Example JSON record.
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class SampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount
+
+ public SampleRecord() {
+ }
+
+ public SampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+ }
+
+ // Pipeline options for writing to JSON files.
+ public interface WriteJsonOptions extends PipelineOptions {
+
+ @Description("A file path to write JSON files to")
+ @Validation.Required
+ String getFilePath();
+
+ // Set the file path.
+ void setFilePath(String filePath);
+ }
+
+ // Main entry point.
+ public static void main(String[] args) {
+
+ // Get the pipeline options.
+ WriteJsonOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteJsonOptions.class);
+
+ // Create the pipeline.
+ Pipeline p = Pipeline.create(options);
+
+ // Create a list of SampleRecord objects.
+ List<SampleRecord> rows =
+ Arrays.asList(
+ new SampleRecord(1, "January", "$1000"),
+ new SampleRecord(2, "February", "$2000"),
+ new SampleRecord(3, "March", "$3000"));
+
+ // Write the records to JSON files
+ p.apply("Create Records", Create.of(rows))
+ .apply(
+ "Write Records to JSON File",
+ JsonIO.<SampleRecord>write(options.getFilePath())
+ .withNumShards(1));
+ // Run the pipeline.
+ p.run();
+ }
+}
+```
+
+This code snippet utilizes the pipeline options pattern to parse command-line
arguments.
diff --git a/learning/prompts/code-generation/java/08_io_csv.md
b/learning/prompts/code-generation/java/08_io_csv.md
new file mode 100644
index 00000000000..f87c82aa0b9
--- /dev/null
+++ b/learning/prompts/code-generation/java/08_io_csv.md
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data in CSV format using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to CSV files using the CsvIO
connector. Here is an example of how to achieve this:
+
+```java
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.csv.CsvIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class WriteCsvFile {
+
+ // ExampleRecord is a POJO that represents the data to be written to the
CSV file
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class ExampleRecord implements Serializable {
+ public int id;
+ public String month;
+ public String amount;
+
+ public ExampleRecord() {
+ }
+
+ public ExampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+ }
+
+ public interface WriteCsvFileOptions extends PipelineOptions {
+
+ @Description("A file path to write CSV files to")
+ @Validation.Required
+ String getFilePath();
+
+ void setFilePath(String filePath);
+ }
+
+ public static void main(String[] args) {
+ WriteCsvFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(WriteCsvFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ List<ExampleRecord> rows =
+ Arrays.asList(
+ new ExampleRecord(1, "January", "$1000"),
+ new ExampleRecord(2, "February", "$2000"),
+ new ExampleRecord(3, "March", "$3000"));
+
+ CSVFormat csvFormat =
+ CSVFormat.DEFAULT.withHeaderComments("CSV file created by Apache
Beam")
+ .withCommentMarker('#');
+
+ p.apply("Create collection", Create.of(rows))
+ .apply(
+ "Write to CSV file",
+ CsvIO.<ExampleRecord>write()
+ .to(options.getFilePath())
+ .withNumShards(1));
+ p.run();
+ }
+}
+```
diff --git a/learning/prompts/code-generation/java/09_io_avro.md
b/learning/prompts/code-generation/java/09_io_avro.md
new file mode 100644
index 00000000000..80c35034657
--- /dev/null
+++ b/learning/prompts/code-generation/java/09_io_avro.md
@@ -0,0 +1,68 @@
+Prompt:
+Write a sample Java code snippet that reads data from an Avro file using
Apache Beam.
+
+Response:
+Your Apache Beam pipeline can read data from Apache Avro files using the
AvroIO connector. Here is an example of how to achieve this:
+
+```java
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.avro.AvroIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadGenericAvroIO {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadGenericAvroIO.class);
+
+ // Define pipeline options for the pipeline
+ public interface ReadAvroOptions extends PipelineOptions {
+ @Description("Path to the Avro file to read from")
+
@Default.String("gs://<your-bucket-name]/[your-object-name]/filename.avro")
+ String getInput();
+
+ void setInput(String value);
+ }
+
+ public static void main(String[] args) {
+
+ // Create pipeline options to parse command-line arguments
+ ReadAvroOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadAvroOptions.class);
+
+ // Create a pipeline
+ Pipeline p = Pipeline.create(options);
+
+ // Declare the schema of the Avro file
+ String fields = "[{ \"name\": \"months\", \"type\": \"string\" }, {
\"name\": \"amount\", \"type\": \"string\" }]";
+ String schema = "{\"type\": \"record\", \"name\":
\"AvroRecord\",\"fields\":" + fields + "}";
+
+ // Parse the schema string into a Schema object
+ Schema avroSchema = new Schema.Parser().parse(schema);
+
+ // Read the Avro file using the AvroIO connector
+ p.apply(AvroIO.readGenericRecords(avroSchema).from(options.getInput()))
+ // Parse the GenericRecord object into a string and log it
+ .apply("Parse and log", ParDo.of(new DoFn<GenericRecord, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ GenericRecord record = c.element();
+ String message = "Month: " + record.get("month") + ". Amount:
" + record.get("amount");
+ LOG.info(message);
+ c.output(message);
+ }
+ }));
+
+ // Execute the pipeline
+ p.run();
+ }
+}
+```
+
+The `ReadAvroOptions` interface is used to define the command-line argument
`--input`, which specifies the path to the Avro file. This code uses pipeline
options to define the required argument.
diff --git a/learning/prompts/code-generation/java/10_io_parquet.md
b/learning/prompts/code-generation/java/10_io_parquet.md
new file mode 100644
index 00000000000..45c3f2d0b7b
--- /dev/null
+++ b/learning/prompts/code-generation/java/10_io_parquet.md
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Parquet file using
Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Apache Parquet files using the
ParquetIO connector. Here is an example of how to achieve this:
+
+```java
+package parquet;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+
+public class WriteSampleParquetFile {
+
+ // Represents a DTO to be written to Parquet.
+ public static class SampleRecord implements Serializable {
+ public static final String COLUMN_ID = "id";
+ public static final String COLUMN_MONTH = "month";
+ public static final String COLUMN_AMOUNT = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRecord() {}
+
+ public SampleRecord(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getMonth() {
+ return month;
+ }
+
+ public String getAmount() {
+ return amount;
+ }
+ }
+
+ // Pipeline options for writing to Parquet files.
+ public interface WriteSampleParquetFileOptions extends PipelineOptions {
+ @Description("A file path to write sample Parquet files to")
+ @Validation.Required
+ String getPath();
+
+ void setPath(String path);
+ }
+
+ public static void main(String[] args) {
+ WriteSampleParquetFileOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteSampleParquetFileOptions.class);
+
+ // Get the Avro schema for the SampleRecord object.
+ Schema sampleRecordSchema =
ReflectData.get().getSchema(SampleRecord.class);
+
+ // Create a pipeline.
+ Pipeline p = Pipeline.create(options);
+
+ // Create a list of SampleRecord objects.
+ List<SampleRecord> rows =
+ Arrays.asList(
+ new SampleRecord(1, "January", "$1000"),
+ new SampleRecord(2, "February", "$2000"),
+ new SampleRecord(3, "March", "$3000"));
+ // Apply the Create transform to the pipeline to create a PCollection
from the list of SampleRecord objects.
+ p.apply("Create", Create.of(rows))
+ // Apply the MapElements transform to the pipeline to map the
SampleRecord objects to GenericRecord objects.
+ .apply(
+ "Map Sample record to GenericRecord",
+ MapElements.via(new
MapSampleRecordToGenericRecord(sampleRecordSchema)))
+ // Set the coder for the GenericRecord objects to AvroCoder.
+ .setCoder(AvroCoder.of(sampleRecordSchema))
+ // Apply the FileIO.write() transform to the pipeline to write the
GenericRecord objects to a Parquet file.
+ .apply(
+ "Write Parquet file",
+ FileIO.<GenericRecord>write()
+ .via(ParquetIO.sink(sampleRecordSchema))
+ .to(options.getPath()));
+ // Run the pipeline.
+ p.run();
+ }
+
+ // A SimpleFunction that maps SampleRecord objects to GenericRecord
objects.
+ private static class MapSampleRecordToGenericRecord
+ extends SimpleFunction<SampleRecord, GenericRecord> {
+
+ private final Schema schema;
+
+ public MapSampleRecordToGenericRecord(Schema schema) {
+ this.schema = schema;
+ }
+
+ // Maps a SampleRecord object to a GenericRecord object.
+ @Override
+ public GenericRecord apply(SampleRecord input) {
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+
+ builder
+ .set(SampleRecord.COLUMN_MONTH, input.getMonth())
+ .set(SampleRecord.COLUMN_AMOUNT, input.getAmount())
+ .set(SampleRecord.COLUMN_ID, input.getId());
+ return builder.build();
+ }
+ }
+}
+```
+
+The `WriteSampleParquetFileOptions` class is utilized to define the
command-line argument `--path`, specifying the path where the Parquet file
should be written. This code uses pipeline options to define the required
`path` argument.
\ No newline at end of file