mosche commented on code in PR #24992:
URL: https://github.com/apache/beam/pull/24992#discussion_r1086378332


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -481,11 +489,45 @@ public long getSplitBacklogBytes() {
     }
   }
 
+  public static class CounterMarkCoder extends CustomCoder<CounterMark> {

Review Comment:
   Is this encoding byte compatible with the previous Avro based encoding? 
Unlikely, but if not it might cause issues if this is ever persisted, e.g. in a 
snapshot.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java:
##########
@@ -42,12 +42,30 @@ private Providers() {}
   public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> 
klass) {
     Map<String, T> providers = new HashMap<>();
     for (T provider : ServiceLoader.load(klass)) {
-      checkArgument(
-          !providers.containsKey(provider.identifier()),
-          "Duplicate providers exist with identifier `%s` for class %s.",
-          provider.identifier(),
-          klass);
-      providers.put(provider.identifier(), provider);
+      // Avro provider is treated as a special case until two providers may 
exist: in "core"
+      // (deprecated) and in "extensions/avro" (actual).
+      if (provider.identifier().equals("avro")) {
+        // Avro provider from "extensions/avro" must have a priority.
+        if (provider
+            .toString()
+            .startsWith(
+                
"org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider"))
 {
+          // Use AvroPayloadSerializerProvider from extensions/avro by any 
case.
+          providers.put(provider.identifier(), provider);
+        } else {
+          // Load Avro provider from "core" if it was not loaded from Avro 
extension before.
+          if (!providers.containsKey(provider.identifier())) {

Review Comment:
   Please use `putIfAbsent` instead



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java:
##########
@@ -42,12 +42,30 @@ private Providers() {}
   public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> 
klass) {
     Map<String, T> providers = new HashMap<>();
     for (T provider : ServiceLoader.load(klass)) {
-      checkArgument(
-          !providers.containsKey(provider.identifier()),
-          "Duplicate providers exist with identifier `%s` for class %s.",
-          provider.identifier(),
-          klass);
-      providers.put(provider.identifier(), provider);
+      // Avro provider is treated as a special case until two providers may 
exist: in "core"
+      // (deprecated) and in "extensions/avro" (actual).
+      if (provider.identifier().equals("avro")) {
+        // Avro provider from "extensions/avro" must have a priority.
+        if (provider

Review Comment:
   Please don't rely on the default `toString()` for this and use 
`getClass().getName()` instead.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java:
##########
@@ -42,12 +42,30 @@ private Providers() {}
   public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> 
klass) {
     Map<String, T> providers = new HashMap<>();
     for (T provider : ServiceLoader.load(klass)) {
-      checkArgument(
-          !providers.containsKey(provider.identifier()),
-          "Duplicate providers exist with identifier `%s` for class %s.",
-          provider.identifier(),
-          klass);
-      providers.put(provider.identifier(), provider);
+      // Avro provider is treated as a special case until two providers may 
exist: in "core"
+      // (deprecated) and in "extensions/avro" (actual).
+      if (provider.identifier().equals("avro")) {
+        // Avro provider from "extensions/avro" must have a priority.
+        if (provider
+            .toString()
+            .startsWith(
+                
"org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider"))
 {
+          // Use AvroPayloadSerializerProvider from extensions/avro by any 
case.
+          providers.put(provider.identifier(), provider);
+        } else {
+          // Load Avro provider from "core" if it was not loaded from Avro 
extension before.
+          if (!providers.containsKey(provider.identifier())) {
+            providers.put(provider.identifier(), provider);
+          }
+        }
+      } else {
+        checkArgument(

Review Comment:
   Maybe better `checkState` instead?



##########
sdks/java/io/kafka/build.gradle:
##########
@@ -90,6 +91,7 @@ dependencies {
   provided library.java.everit_json_schema
   testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
   testImplementation project(":sdks:java:io:synthetic")
+  testImplementation project(path: ":sdks:java:extensions:avro", 
configuration: "testRuntimeMigration")

Review Comment:
   Shouldn't `implementation` be enough?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java:
##########
@@ -70,6 +68,8 @@
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

Review Comment:
   How about only registering these if they exist on the classpath? So we can 
skip adding the dependency to the runner by default.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java:
##########
@@ -42,12 +42,30 @@ private Providers() {}
   public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> 
klass) {
     Map<String, T> providers = new HashMap<>();
     for (T provider : ServiceLoader.load(klass)) {
-      checkArgument(
-          !providers.containsKey(provider.identifier()),
-          "Duplicate providers exist with identifier `%s` for class %s.",
-          provider.identifier(),
-          klass);
-      providers.put(provider.identifier(), provider);
+      // Avro provider is treated as a special case until two providers may 
exist: in "core"

Review Comment:
   ```suggestion
         // Avro provider is treated as a special case as two providers may 
exist: in "core"
   ```



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/CoderRegistryTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.coders;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviderRegistrar;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for CoderRegistry and AvroCoder. */
+@RunWith(JUnit4.class)
+public class CoderRegistryTest {
+

Review Comment:
   Wondering, what's the purpose of this test? As far as I can see this doesn't 
test anything Avro specific, but just general behavior of the CoderRegistry? Do 
we need this test at all?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java:
##########
@@ -481,11 +489,45 @@ public long getSplitBacklogBytes() {
     }
   }
 
+  public static class CounterMarkCoder extends CustomCoder<CounterMark> {

Review Comment:
   Though, considering `GenerateSequence` is the recommended alternative, I'm 
not sure if to worry much about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to