dariabezkorovaina commented on code in PR #30785:
URL: https://github.com/apache/beam/pull/30785#discussion_r1543168217
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka
topic.
Review Comment:
```suggestion
This code snippet uses the Apache Beam KafkaIO connector to read messages
from a Kafka topic.
```
##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from
TFRecord files.
Review Comment:
```suggestion
This code snippet uses the Apache Beam TFRecordIO connector to read messages
from TFRecord files.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
Review Comment:
```suggestion
Response:
This code reads data from an Oracle Java Database Connectivity (JDBC)
enabled database using the Apache Beam JdbcIO connector.
```
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka
topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap
server and topic name. The `@Description` annotation provides a description of
the option. Use `--bootstrapServer` and `--topicName` command line arguments to
set the bootstrap server and topic name when running the pipeline.
+```java
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadKafkaTopicOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
+```java
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+The pipeline is then used to read messages from the Kafka topic. The
`KafkaIO.read()` method is used to configure the Kafka source with the
bootstrap server, topic name, key and value deserializers. The `ParDo`
transform is applied to log the message key and value.
Review Comment:
```suggestion
The pipeline reads messages from the specified Kafka topic using
`KafkaIO.read()`. This method configures the Kafka source by providing the
bootstrap server, topic name, key deserializer, and value deserializer. Then,
the pipeline applies a `ParDo` transform to log the Kafka message key and value.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadFromJdbcPartitionOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options. The
`DataSourceConfiguration` is created using the options for the JDBC source.
Review Comment:
```suggestion
In the `main method`, the `PipelineOptionsFactory` class creates a
`ReadFromJdbcPartitionOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options. The
`DataSourceConfiguration` class represents a configuration object that
encapsulates the necessary information for establishing the connection to the
JDBC source.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
Review Comment:
```suggestion
@Description("JDBC driver class name")
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadFromJdbcPartitionOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options. The
`DataSourceConfiguration` is created using the options for the JDBC source.
+
+```java
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `JdbcIO.readWithPartitions` method is used to read data from the JDBC
source in parallel. The `withNumPartitions` method specifies the number of
partitions to use. The `withPartitionColumn` method specifies the column to use
for partitioning the data. The `withLowerBound` and `withUpperBound` methods
specify the lower and upper bounds of the data to read.
+
+```java
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+```
+
+Finally, the pipeline is executed using the `run` method.
Review Comment:
```suggestion
Finally, the `run` method executes the pipeline.
```
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka
topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap
server and topic name. The `@Description` annotation provides a description of
the option. Use `--bootstrapServer` and `--topicName` command line arguments to
set the bootstrap server and topic name when running the pipeline.
Review Comment:
```suggestion
The `ReadKafkaTopicOptions` interface specifies the Kafka bootstrap server
and topic name. The `@Description` annotation provides a description of each
option. When running the pipeline, use the command-line arguments
`--bootstrapServer` and `--topicName` to set these parameters.
```
##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from
TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set
when running the pipeline. In this case, it specifies the glob file pattern to
read TFRecords from. Use `--filePattern` command line argument to set the file
pattern when running the pipeline. The `@Description` annotation provides a
description of the option, and `@Validation.Required` ensures that the option
is required.
+```java
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadTFRecordFileOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
Review Comment:
```suggestion
In the `main` method, the `PipelineOptionsFactory` class creates a
`ReadTFRecordFileOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
```
##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from
TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set
when running the pipeline. In this case, it specifies the glob file pattern to
read TFRecords from. Use `--filePattern` command line argument to set the file
pattern when running the pipeline. The `@Description` annotation provides a
description of the option, and `@Validation.Required` ensures that the option
is required.
+```java
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadTFRecordFileOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
+```java
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `TFRecordIO.read` method is used to create a `TFRecordIO` object. The
`from` method specifies the glob file pattern to read TFRecords from. The
`withCompression` method specifies the compression type to use when reading the
TFRecords. In this case, the compression type is set to `UNCOMPRESSED`. This
means that the TFRecords are not compressed. This is the default compression
type. The `apply` method is used to apply the transformation to the pipeline.
Review Comment:
```suggestion
The `TFRecordIO.read` method configures the TFRecord input operations. It
specifies the file pattern for TFRecord files using the `from` method and
defines the compression type as `UNCOMPRESSED` using the `withCompression`
method. `UNCOMPRESSED` is the default compression type that ensures the
TFRecords are read without compression. The `apply` method then applies the
transformation to the pipeline.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from
TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set
when running the pipeline. In this case, it specifies the glob file pattern to
read TFRecords from. Use `--filePattern` command line argument to set the file
pattern when running the pipeline. The `@Description` annotation provides a
description of the option, and `@Validation.Required` ensures that the option
is required.
+```java
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadTFRecordFileOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
+```java
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `TFRecordIO.read` method is used to create a `TFRecordIO` object. The
`from` method specifies the glob file pattern to read TFRecords from. The
`withCompression` method specifies the compression type to use when reading the
TFRecords. In this case, the compression type is set to `UNCOMPRESSED`. This
means that the TFRecords are not compressed. This is the default compression
type. The `apply` method is used to apply the transformation to the pipeline.
+```java
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+```
+Finally, the pipeline is executed using the `run` method.
Review Comment:
```suggestion
Finally, the `run` method executes the pipeline.
```
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka
topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap
server and topic name. The `@Description` annotation provides a description of
the option. Use `--bootstrapServer` and `--topicName` command line arguments to
set the bootstrap server and topic name when running the pipeline.
+```java
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadKafkaTopicOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
+```java
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+```
+The pipeline is then used to read messages from the Kafka topic. The
`KafkaIO.read()` method is used to configure the Kafka source with the
bootstrap server, topic name, key and value deserializers. The `ParDo`
transform is applied to log the message key and value.
+```java
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+```
+Finally, the pipeline is executed using the `run` method.
Review Comment:
```suggestion
Finally, the `run` method executes the pipeline.
```
##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+ public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+ @Description("A glob file pattern to read TFRecords from")
+ @Validation.Required
+ String getFilePattern();
+
+ void setFilePattern(String filePattern);
+ }
+
+ public static void main(String[] args) {
+ ReadTFRecordFileOptions options =
+ PipelineOptionsFactory.fromArgs(args)
+ .withValidation().as(ReadTFRecordFileOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from TFRecord file",
+ TFRecordIO.read()
+ .from(options.getFilePattern())
+ .withCompression(Compression.UNCOMPRESSED))
+ .apply(
+ "Convert and log",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c) {
+ String output =
+ new String(c.element(), Charsets.UTF_8);
+ LOG.info("Output: {}", output);
+ c.output(output);
+ }
+ }));
+
+ p.run();
+ }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from
TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set
when running the pipeline. In this case, it specifies the glob file pattern to
read TFRecords from. Use `--filePattern` command line argument to set the file
pattern when running the pipeline. The `@Description` annotation provides a
description of the option, and `@Validation.Required` ensures that the option
is required.
Review Comment:
```suggestion
The `ReadTFRecordFileOptions` interface defines the options that can be set
when running the pipeline. In this case, it specifies the glob file pattern to
read TFRecords from. Use the `--filePattern` command line argument to set the
file pattern when running the pipeline. The `@Description` annotation provides
a description of the option, and the `@Validation.Required` annotation ensures
that the option is required.
```
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka
topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap
server and topic name. The `@Description` annotation provides a description of
the option. Use `--bootstrapServer` and `--topicName` command line arguments to
set the bootstrap server and topic name when running the pipeline.
+```java
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadKafkaTopicOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
Review Comment:
```suggestion
In the `main` method, the `PipelineOptionsFactory` class creates a
`ReadKafkaTopicOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options.
```
##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+ public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ public static void main(String[] args) {
+ ReadKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ p
+ .apply(KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ )
+ .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer,
String>, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> kv = c.element().getKV();
+ String message = String.format("Message Key:
%s, Message Value: %s",
+ kv.getKey(), kv.getValue());
+ LOG.info(message);
+ c.output(message);
+ }
+ })
+ );
+
+ p.run();
+ }
+ }
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
Review Comment:
```suggestion
The `SampleRow` class models the rows fetched from the JDBC source. It
includes the fields `id`, `month`, and `amount` that correspond to the
constants representing column names in the database table. Additionally, it
overrides the `toString` method to provide a string representation of the
`SampleRow` object.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the
Apache Beam PubSubIO connector.
Review Comment:
```suggestion
Your Apache Beam pipeline can write data to a Kafka topic using the Apache
Beam PubSubIO connector.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
Review Comment:
```suggestion
The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
extracts column values from the `ResultSet` interface and creates an
`ExampleRow` object.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam
JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
Review Comment:
```suggestion
Here is an example of how to use the JdbcIO connector to accomplish this:
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a
`ReadFromJdbcPartitionOptions` object from the command-line arguments. The
`Pipeline.create` method creates a new pipeline with the specified options. The
`DataSourceConfiguration` is created using the options for the JDBC source.
+
+```java
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+```
+
+The `JdbcIO.readWithPartitions` method is used to read data from the JDBC
source in parallel. The `withNumPartitions` method specifies the number of
partitions to use. The `withPartitionColumn` method specifies the column to use
for partitioning the data. The `withLowerBound` and `withUpperBound` methods
specify the lower and upper bounds of the data to read.
Review Comment:
```suggestion
To read data from the JDBC source in parallel, the code snippet uses the
`JdbcIO.readWithPartitions` method. The `.withNumPartitions()` method
determines how many partitions the data will be split into for parallel
processing. The `withPartitionColumn` method specifies the column to use for
partitioning the data. The `withLowerBound` and `withUpperBound` methods
specify the lower and upper bounds of the data to read.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
Review Comment:
```suggestion
The `ReadFromJdbcPartitionOptions` interface specifies the table name, JDBC
source URL, driver class name, username, and password. The `@Description`
annotation provides a description of the option. Use the command-line arguments
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` to set these options when executing the pipeline.
```
##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the
Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
Review Comment:
```suggestion
Write a sample Java code snippet that writes data to a JDBC sink using
Apache Beam.
```
##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the
Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
Review Comment:
```suggestion
Here is an example of how to use the KafkaIO connector to accomplish this:
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam
JdbcIO connector.
Review Comment:
```suggestion
Your Apache Beam pipeline can write data to an Oracle Java Database
Connectivity (JDBC) enabled database using the Apache Beam JdbcIO connector.
```
##########
learning/prompts/code-generation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,79 @@
+Prompt:
+Write a sample Java code snippet that writes data to a TFRecord file using
Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to TFRecord file using the Apache
Beam TFRecordIO connector.
Review Comment:
```suggestion
Your Apache Beam pipeline can write data to a TFRecord file using the Apache
Beam TFRecordIO connector.
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam
JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector
Review Comment:
```suggestion
// Pipeline to write data to a JDBC sink using the Apache Beam JdbcIO
connector
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam
JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector
+public class WriteJdbcSink {
+
+ // Class representing the data to be written to the JDBC sink
+ public static class ExampleRow implements Serializable {
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public ExampleRow() {}
+
+ public ExampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getMonth() {
+ return month;
+ }
+
+ public String getAmount() {
+ return amount;
+ }
+ }
+
+ // Pipeline options for writing data to the JDBC sink
+ public interface WriteJdbcSinkOptions extends PipelineOptions {
+ @Description("Table name to write to")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC sink URL")
+ @Validation.Required
+ String getJdbcSinkUrl();
+
+ void setJdbcSinkUrl(String jdbcSinkUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("DB Username")
+ @Validation.Required
+ String getSinkUsername();
+
+ void setSinkUsername(String username);
+
+ @Description("DB password")
+ @Validation.Required
+ String getSinkPassword();
+
+ void setSinkPassword(String password);
+ }
+
+ // Main method to run the pipeline
+ public static void main(String[] args) {
+ // Parse the pipeline options from the command line
+ WriteJdbcSinkOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteJdbcSinkOptions.class);
+
+ // Create the JDBC sink configuration using the provided options
+ JdbcIO.DataSourceConfiguration config =
+
JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSinkUrl())
+ .withUsername(options.getSinkUsername())
+ .withPassword(options.getSinkPassword());
+
+ // Create the pipeline
+ Pipeline p = Pipeline.create(options);
+
+ // Create sample rows to write to the JDBC sink
+ List<ExampleRow> rows = Arrays.asList(
+ new ExampleRow(1, "January", "$1000"),
+ new ExampleRow(2, "February", "$2000"),
+ new ExampleRow(3, "March", "$3000")
+ );
+
+ // // Create PCollection from the list of rows
+ p.apply("Create collection of records", Create.of(rows))
+ // Write the rows to the JDBC sink
+ .apply(
+ "Write to JDBC Sink",
+ JdbcIO.<ExampleRow>write()
+ .withDataSourceConfiguration(config)
+ .withStatement(String.format("insert into %s values(?, ?, ?)",
options.getTableName()))
+ .withBatchSize(10L)
+ .withPreparedStatementSetter(
+ (element, statement) -> {
+ statement.setInt(1, element.getId());
+ statement.setString(2, element.getMonth());
+ statement.setString(3, element.getAmount());
+ }));
+ // Run the pipeline
+ p.run();
+ }
+}
+
+``
Review Comment:
```suggestion
}
```
```
##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the
Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+
+package kafka;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a Kafka topic using Apache Beam KafkaIO connector
+public class WriteKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteKafkaTopic.class);
+
+ // Pipeline options for writing data to Kafka topic
+ public interface WriteKafkaTopicOptions extends WriteKafkaTopicOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ // Main method to run the pipeline
+ public static void main(String[] args) {
+
+ // Parse the pipeline options from the command line
+ WriteKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteKafkaTopicOptions.class);
+
+ // Create the pipeline
+ Pipeline p = Pipeline.create(options);
+
+ // Create sample messages to write to the Kafka topic
+ final List<KV<Integer, String>> messages = Arrays.asList(
+ KV.of(1, "Sample message 1"),
+ KV.of(2, "Sample message 2"),
+ KV.of(3, "Sample message 3")
+ );
+
+ // Write the messages to the Kafka topic
+ p
+ // Create PCollection from the list of sample messages
+ .apply(Create.of(messages))
+ // Write the messages to the Kafka topic using the KafkaIO
connector
+ .apply(KafkaIO.<Integer, String>write()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ // Specify the key and value serializers for the Kafka
topic
+ .withValueSerializer(StringSerializer.class)
+ .withKeySerializer(IntegerSerializer.class)
+ );
+ // Run the pipeline
+ p.run();
+ }
+}
+
+```
+This code snippet utilizes the pipeline options pattern to parse command-line
arguments.
Review Comment:
```suggestion
This code snippet utilizes the pipeline options pattern to parse
command-line arguments.
```
##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+ public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+
+ public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
+ @Default.String("org.postgresql.Driver")
+ String getDriverClassName();
+
+ void setDriverClassName(String driverClassName);
+
+ @Description("Jdbc DB Username")
+ @Validation.Required
+ String getSourceUsername();
+
+ void setSourceUsername(String username);
+
+ @Description("Jdbc DB password")
+ @Validation.Required
+ String getSourcePassword();
+
+ void setSourcePassword(String password);
+ }
+
+ public static void main(String[] args) {
+ ReadFromJdbcPartitionOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+ JdbcIO.DataSourceConfiguration config =
+ JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(),
options.getJdbcSourceUrl())
+ .withUsername(options.getSourceUsername())
+ .withPassword(options.getSourcePassword());
+
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "Read from Jdbc source",
+ JdbcIO.<ExampleRow>readWithPartitions()
+ .withDataSourceConfiguration(config)
+ .withRowMapper(new CreateExampleRow())
+ .withTable(options.getTableName())
+ .withNumPartitions(1)
+ .withPartitionColumn(ExampleRow.ID_COLUMN)
+ .withLowerBound(0L)
+ .withUpperBound(1000L))
+ .apply(
+ "Log Source Data",
+ ParDo.of(
+ new DoFn<ExampleRow, ExampleRow>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ LOG.info(Objects.requireNonNull(c.element()).toString());
+ c.output(c.element());
+ }
+ }));
+
+ p.run();
+ }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source.
It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`,
`MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the
database table. The `toString` method is overridden to provide a string
representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+ public static final String ID_COLUMN = "id";
+ public static final String MONTH_COLUMN = "month";
+ public static final String AMOUNT_COLUMN = "amount";
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public SampleRow() {}
+
+ public SampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ @Override
+ public String toString() {
+ return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" +
amount + '\'' + '}';
+ }
+ }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to
map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method
reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+ public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow>
{
+ @Override
+ public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+ return new ExampleRow(
+ Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+ resultSet.getString(ExampleRow.MONTH_COLUMN),
+ resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+ }
+ }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table
name, JDBC source URL, driver class name, username, and password. The
`@Description` annotation provides a description of the option. Use
`--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and
`--sourcePassword` command line arguments to set these options when running the
pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+ @Description("Table name to read from")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC Source URL")
+ @Validation.Required
+ String getJdbcSourceUrl();
+
+ void setJdbcSourceUrl(String jdbcUrl);
+
+ @Description("")
Review Comment:
```suggestion
@Description("JDBC driver class name")
```
##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic
using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the
Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+
+package kafka;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a Kafka topic using Apache Beam KafkaIO connector
+public class WriteKafkaTopic {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteKafkaTopic.class);
+
+ // Pipeline options for writing data to Kafka topic
+ public interface WriteKafkaTopicOptions extends WriteKafkaTopicOptions {
+ @Description("Kafka Bootstrap Server")
+ String getBootstrapServer();
+
+ void setBootstrapServer(String value);
+
+ @Description("Kafka Topic Name")
+ String getTopicName();
+
+ void setTopicName(String value);
+ }
+
+ // Main method to run the pipeline
+ public static void main(String[] args) {
+
+ // Parse the pipeline options from the command line
+ WriteKafkaTopicOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteKafkaTopicOptions.class);
+
+ // Create the pipeline
+ Pipeline p = Pipeline.create(options);
+
+ // Create sample messages to write to the Kafka topic
+ final List<KV<Integer, String>> messages = Arrays.asList(
+ KV.of(1, "Sample message 1"),
+ KV.of(2, "Sample message 2"),
+ KV.of(3, "Sample message 3")
+ );
+
+ // Write the messages to the Kafka topic
+ p
+ // Create PCollection from the list of sample messages
+ .apply(Create.of(messages))
+ // Write the messages to the Kafka topic using the KafkaIO
connector
+ .apply(KafkaIO.<Integer, String>write()
+ .withBootstrapServers(options.getBootstrapServer())
+ .withTopic(options.getTopicName())
+ // Specify the key and value serializers for the Kafka
topic
+ .withValueSerializer(StringSerializer.class)
+ .withKeySerializer(IntegerSerializer.class)
+ );
+ // Run the pipeline
+ p.run();
+ }
+}
+
Review Comment:
```suggestion
```
##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink using Apache
Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam
JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector
+public class WriteJdbcSink {
+
+ // Class representing the data to be written to the JDBC sink
+ public static class ExampleRow implements Serializable {
+
+ private int id;
+ private String month;
+ private String amount;
+
+ public ExampleRow() {}
+
+ public ExampleRow(int id, String month, String amount) {
+ this.id = id;
+ this.month = month;
+ this.amount = amount;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getMonth() {
+ return month;
+ }
+
+ public String getAmount() {
+ return amount;
+ }
+ }
+
+ // Pipeline options for writing data to the JDBC sink
+ public interface WriteJdbcSinkOptions extends PipelineOptions {
+ @Description("Table name to write to")
+ @Validation.Required
+ String getTableName();
+
+ void setTableName(String tableName);
+
+ @Description("JDBC sink URL")
+ @Validation.Required
+ String getJdbcSinkUrl();
+
+ void setJdbcSinkUrl(String jdbcSinkUrl);
+
+ @Description("")
Review Comment:
```suggestion
@Description("JDBC driver class name")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]