[ 
https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=450655&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-450655
 ]

ASF GitHub Bot logged work on BEAM-9977:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jun/20 21:30
            Start Date: 24/Jun/20 21:30
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445181264



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")
+  @Nullable
+  abstract List<String> getBootStrapServers();
+
+  private TopicPartition topicPartition = null;
+
+  public TopicPartition getTopicPartition() {

Review comment:
       This will get pulled into the generated schema which I don't think is 
your intention. You should change the name so it's not a getter, or add 
`@SchemaIgnore`

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")
+  @Nullable
+  abstract List<String> getBootStrapServers();
+
+  private TopicPartition topicPartition = null;
+
+  public TopicPartition getTopicPartition() {
+    if (topicPartition == null) {
+      topicPartition = new TopicPartition(getTopic(), getPartition());
+    }
+    return topicPartition;
+  }
+
+  public static KafkaSourceDescription of(
+      TopicPartition topicPartition,
+      Long startReadOffset,
+      Instant startReadTime,
+      List<String> bootstrapServers) {
+    return new AutoValue_KafkaSourceDescription(
+        topicPartition.topic(),
+        topicPartition.partition(),
+        startReadOffset,
+        startReadTime,
+        bootstrapServers);
+  }
+
+  public static Coder<KafkaSourceDescription> getCoder(SchemaRegistry registry)
+      throws NoSuchSchemaException {
+    return SchemaCoder.of(
+        registry.getSchema(KafkaSourceDescription.class),
+        TypeDescriptor.of(KafkaSourceDescription.class),
+        registry.getToRowFunction(KafkaSourceDescription.class),
+        registry.getFromRowFunction(KafkaSourceDescription.class));
+  }

Review comment:
       I don't think you you should need this function. By registering a schema 
for KafkaSourceDescription Beam we will default to using SchemaCoder

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")

Review comment:
       +1
   
   The default in the inferred schema should be camel-case with the first 
letter lower-case so this would be a no-op as written (same with topic and 
partition, but there's value in making them explicit if you want).

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -906,19 +1082,91 @@ public void setValueDeserializer(String 
valueDeserializer) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-      Unbounded<KafkaRecord<K, V>> unbounded =
-          org.apache.beam.sdk.io.Read.from(
-              
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+      // The Read will be expanded into SDF transform when "beam_fn_api" is 
enabled and
+      // "beam_fn_api_use_deprecated_read" is not enabled.
+      if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
+          || ExperimentalOptions.hasExperiment(
+              input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")) {
+        // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
+        Unbounded<KafkaRecord<K, V>> unbounded =
+            org.apache.beam.sdk.io.Read.from(
+                
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+
+        PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = 
unbounded;
+
+        if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+          transform =
+              
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+        }
 
-      PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+        return input.getPipeline().apply(transform);
+      }
+      ReadAll<K, V> readTransform =
+          ReadAll.<K, V>read()
+              .withConsumerConfigOverrides(getConsumerConfig())
+              .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
+              .withConsumerFactoryFn(getConsumerFactoryFn())
+              .withKeyDeserializerProvider(getKeyDeserializerProvider())
+              .withValueDeserializerProvider(getValueDeserializerProvider())
+              .withManualWatermarkEstimator()
+              .withTimestampPolicyFactory(getTimestampPolicyFactory());
+      if (isCommitOffsetsInFinalizeEnabled()) {
+        readTransform = readTransform.commitOffsets();
+      }
+      PCollection<KafkaSourceDescription> output =
+          input
+              .getPipeline()
+              .apply(Impulse.create())
+              .apply(ParDo.of(new GenerateKafkaSourceDescription(this)));
+      try {
+        
output.setCoder(KafkaSourceDescription.getCoder(input.getPipeline().getSchemaRegistry()));

Review comment:
       This should happen automatically because there's a schema registered for 
`KafkaSourceDescription`. Was that not working?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 450655)
    Time Spent: 18h 10m  (was: 18h)

> Build Kafka Read on top of Java SplittableDoFn
> ----------------------------------------------
>
>                 Key: BEAM-9977
>                 URL: https://issues.apache.org/jira/browse/BEAM-9977
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: P2
>          Time Spent: 18h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to