This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 357fd262211 Revert #37631 and #38497 on HEAD (#38516)
357fd262211 is described below

commit 357fd2622114ccce99726baa5e9735895e5392ed
Author: Yi Hu <[email protected]>
AuthorDate: Fri May 15 17:02:41 2026 -0400

    Revert #37631 and #38497 on HEAD (#38516)
    
    * Revert "Adds a new coder translator for Java SchemaCoder.  (#37631)"
    
    This reverts commit 81769cbfd132ed3c257685fbdc87f076b903e9f5.
    
    * Revert "Sickbay two failed tests due to new schema coder urn. (#38497)"
    
    This reverts commit 3dbd7c8c35b6a396c5e6c6fed2a3b37d4f731252.
---
 .../beam_PostCommit_Java_ValidatesRunner_ULR.json  |  3 +-
 CHANGES.md                                         |  5 +-
 .../dataflow/DataflowPipelineTranslator.java       |  2 +-
 .../beam/runners/dataflow/DataflowRunner.java      | 59 +++++++---------
 .../dataflow/DataflowPipelineTranslatorTest.java   | 77 ++++-----------------
 .../control/ProcessBundleDescriptorsTest.java      |  6 +-
 runners/portability/java/build.gradle              |  5 --
 .../sdk/util/construction/CoderTranslation.java    | 62 ++---------------
 .../sdk/util/construction/CoderTranslator.java     |  1 -
 .../construction/CoderTranslatorRegistrar.java     | 16 -----
 .../sdk/util/construction/CoderTranslators.java    | 79 ----------------------
 .../sdk/util/construction/ModelCoderRegistrar.java | 28 ++------
 .../beam/sdk/util/construction/ModelCoders.java    |  2 -
 .../util/construction/RehydratedComponents.java    |  3 +-
 .../beam/sdk/util/construction/SdkComponents.java  | 39 +++++------
 .../util/construction/CoderTranslationTest.java    | 38 +----------
 .../sdk/expansion/service/ExpansionService.java    |  2 +-
 .../extensions/avro/AvroGenericCoderRegistrar.java | 18 -----
 .../beam/fn/harness/state/StateBackedIterable.java | 22 ------
 19 files changed, 73 insertions(+), 394 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
index fbd81891f93..6e2f429dd24 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
@@ -2,6 +2,5 @@
   "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
   "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should 
run this test",
-  "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface",
-  "https://github.com/apache/beam/pull/38497": "sickbay two failed tests"
+  "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
diff --git a/CHANGES.md b/CHANGES.md
index ca911e52a7a..52475a99d8e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -85,9 +85,6 @@
 
 ## Breaking Changes
 
-* Portable Java SDK now encodes SchemaCoders in a portable way 
([#34672](https://github.com/apache/beam/issues/34672)).
-  - Original custom Java coder encoding can still be obtained using 
[StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47)
 ([#34672](https://github.com/apache/beam/issues/34672)).
-  - Fixes ([#36496](https://github.com/apache/beam/issues/36496)), 
([#30276](https://github.com/apache/beam/issues/30276)), 
([#29245](https://github.com/apache/beam/issues/29245)).
 * (Python) Made Beartype the default fallback type checking tool. This can be 
disabled with the `--disable_beartype` pipeline option. 
([#38275](https://github.com/apache/beam/issues/38275))
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
 
@@ -2441,4 +2438,4 @@ Schema Options, it will be removed in version `2.23.0`. 
([BEAM-9704](https://iss
 
 ## Highlights
 
-- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
+- For versions 2.19.0 and older release notes are available on [Apache Beam 
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 1609cf6ea23..4016f31a547 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -221,7 +221,7 @@ public class DataflowPipelineTranslator {
   private static byte[] serializeWindowingStrategy(
       WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
     try {
-      SdkComponents sdkComponents = SdkComponents.create(options);
+      SdkComponents sdkComponents = SdkComponents.create();
 
       String workerHarnessContainerImageURL =
           
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c19673a3117..299e7fa21ed 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1333,20 +1333,19 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     // with the SDK harness image (which implements Fn API).
     //
     // The same Environment is used in different and contradictory ways, 
depending on whether
-    // it is a portable or non-portable job submission.
+    // it is a v1 or v2 job submission.
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(workerHarnessContainerImageURL);
 
-    // The SdkComponents for portable and non-portable job submission must be 
kept distinct. Both
+    // The SdkComponents for portable an non-portable job submission must be 
kept distinct. Both
     // need the default environment.
-    SdkComponents portableComponents =
-        SdkComponents.create(
-            options,
-            defaultEnvironmentForDataflow
-                .toBuilder()
-                .addAllDependencies(getDefaultArtifacts())
-                .addAllCapabilities(Environments.getJavaCapabilities())
-                .build());
+    SdkComponents portableComponents = SdkComponents.create();
+    portableComponents.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
 
     RunnerApi.Pipeline portablePipelineProto =
         PipelineTranslation.toProto(pipeline, portableComponents, false);
@@ -1375,30 +1374,28 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
 
     if (useUnifiedWorker(options)) {
-      LOG.info(
-          "Skipping non-portable transform replacements since job will run on 
portable worker.");
+      LOG.info("Skipping v1 transform replacements since job will run on v2.");
     } else {
-      // Now rewrite things to be as needed for non-portable (mutates the 
pipeline).
-      // This way the job submitted is valid for portable and non-portable, 
simultaneously.
+      // Now rewrite things to be as needed for v1 (mutates the pipeline)
+      // This way the job submitted is valid for v1 and v2, simultaneously
       replaceV1Transforms(pipeline);
     }
-    // Capture the SdkComponents for look up during step translations.
-    SdkComponents dataflowNonPortableComponents =
-        SdkComponents.create(
-            options,
-            defaultEnvironmentForDataflow
-                .toBuilder()
-                .addAllDependencies(getDefaultArtifacts())
-                .addAllCapabilities(Environments.getJavaCapabilities())
-                .build());
-    // No need to perform transform upgrading for the non-portable runner 
proto.
-    RunnerApi.Pipeline dataflowNonPortablePipelineProto =
-        PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, 
true, false);
+    // Capture the SdkComponents for look up during step translations
+    SdkComponents dataflowV1Components = SdkComponents.create();
+    dataflowV1Components.registerEnvironment(
+        defaultEnvironmentForDataflow
+            .toBuilder()
+            .addAllDependencies(getDefaultArtifacts())
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
+    // No need to perform transform upgrading for the Runner v1 proto.
+    RunnerApi.Pipeline dataflowV1PipelineProto =
+        PipelineTranslation.toProto(pipeline, dataflowV1Components, true, 
false);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(
-          "Dataflow non-portable worker pipeline proto:\n{}",
-          
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
+          "Dataflow v1 pipeline proto:\n{}",
+          TextFormat.printer().printToString(dataflowV1PipelineProto));
     }
 
     // Set a unique client_request_id in the CreateJob request.
@@ -1418,11 +1415,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     JobSpecification jobSpecification =
         translator.translate(
-            pipeline,
-            dataflowNonPortablePipelineProto,
-            dataflowNonPortableComponents,
-            this,
-            packages);
+            pipeline, dataflowV1PipelineProto, dataflowV1Components, this, 
packages);
 
     if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && 
!useUnifiedWorker(options)) {
       List<String> experiments =
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 953f7a638ed..f8818931a68 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -47,7 +47,6 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.auto.value.AutoValue;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -93,8 +92,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
@@ -169,11 +166,15 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   private SdkComponents createSdkComponents(PipelineOptions options) {
+    SdkComponents sdkComponents = SdkComponents.create();
+
     String containerImageURL =
         
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
     RunnerApi.Environment defaultEnvironmentForDataflow =
         Environments.createDockerEnvironment(containerImageURL);
-    return SdkComponents.create(options, defaultEnvironmentForDataflow);
+
+    sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
+    return sdkComponents;
   }
 
   // A Custom Mockito matcher for an initial Job that checks that all
@@ -1293,16 +1294,15 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     file1.deleteOnExit();
     File file2 = File.createTempFile("file2-", ".txt");
     file2.deleteOnExit();
-    SdkComponents sdkComponents =
-        SdkComponents.create(
-            options,
-            
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
-                .toBuilder()
-                .addAllDependencies(
-                    Environments.getArtifacts(
-                        ImmutableList.of("file1.txt=" + file1, "file2.txt=" + 
file2)))
-                .addAllCapabilities(Environments.getJavaCapabilities())
-                .build());
+    SdkComponents sdkComponents = SdkComponents.create();
+    sdkComponents.registerEnvironment(
+        
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
+            .toBuilder()
+            .addAllDependencies(
+                Environments.getArtifacts(
+                    ImmutableList.of("file1.txt=" + file1, "file2.txt=" + 
file2)))
+            .addAllCapabilities(Environments.getJavaCapabilities())
+            .build());
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
 
@@ -1870,53 +1870,4 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
       return null;
     }
   }
-
-  @AutoValue
-  @DefaultSchema(AutoValueSchema.class)
-  public abstract static class SimpleAutoValue {
-    public abstract String getString();
-
-    public abstract int getInt32();
-
-    public abstract long getInt64();
-
-    public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
-        String string, int int32, long int64) {
-      return new 
AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
-    }
-  }
-
-  @Test
-  public void testSchemaCoderTranslation() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline pipeline = Pipeline.create(options);
-    pipeline
-        .apply(Impulse.create())
-        .apply(
-            MapElements.via(
-                new SimpleFunction<byte[], SimpleAutoValue>() {
-                  @Override
-                  public SimpleAutoValue apply(byte[] input) {
-                    return SimpleAutoValue.of("foo", 5, 10L);
-                  }
-                }))
-        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
-    {
-      SdkComponents sdkComponents = createSdkComponents(options);
-      RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
-      Map<String, RunnerApi.Coder> coders = 
pipelineProto.getComponents().getCodersMap();
-      assertTrue(coders.containsKey("SchemaCoder"));
-      assertEquals("beam:coder:schema:v1", 
coders.get("SchemaCoder").getSpec().getUrn());
-    }
-
-    // Prior to version 2.74, SchemaCoders are translated as custom java 
coders.
-    {
-      options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
-      SdkComponents sdkComponents = createSdkComponents(options);
-      RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
sdkComponents, true);
-      Map<String, RunnerApi.Coder> coders = 
pipelineProto.getComponents().getCodersMap();
-      assertTrue(coders.containsKey("SchemaCoder"));
-      assertEquals("beam:coders:javasdk:0.1", 
coders.get("SchemaCoder").getSpec().getUrn());
-    }
-  }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
index 9ea7404053d..21d7550c38b 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
@@ -78,8 +78,7 @@ public class ProcessBundleDescriptorsTest implements 
Serializable {
     // Add another stateful stage with a non-standard key coder
     Pipeline p = Pipeline.create();
     Coder<Void> keycoder = VoidCoder.of();
-    ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
-    assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), 
is(false));
+    assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
     p.apply("impulse", Impulse.create())
         .apply(
             "create",
@@ -166,8 +165,7 @@ public class ProcessBundleDescriptorsTest implements 
Serializable {
   public void testLengthPrefixingOfInputCoderExecutableStage() throws 
Exception {
     Pipeline p = Pipeline.create();
     Coder<Void> voidCoder = VoidCoder.of();
-    ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
-    assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), 
is(false));
+    assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
     p.apply("impulse", Impulse.create())
         .apply(
             ParDo.of(
diff --git a/runners/portability/java/build.gradle 
b/runners/portability/java/build.gradle
index aa147e8426c..6e3b431e802 100644
--- a/runners/portability/java/build.gradle
+++ b/runners/portability/java/build.gradle
@@ -214,11 +214,6 @@ def createUlrValidatesRunnerTask = { name, 
environmentType, dockerImageTask = ""
       // TODO(https://github.com/apache/beam/issues/31231)
       excludeTestsMatching 
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
 
-      // TODO(https://github.com/apache/beam/issues/33859): Failed with 
"KeyError: 'beam:coder:schema:v1'".
-      // New schema coder urn is not yet supported in runners other than 
dataflow
-      excludeTestsMatching 
'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithShuffle'
-      excludeTestsMatching 
'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithoutShuffle'
-
       for (String test : sickbayTests) {
         excludeTestsMatching test
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
index 2cc4bf0c6a0..22859dc68b9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
@@ -25,13 +25,11 @@ import java.util.ServiceLoader;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.dataflow.qual.Deterministic;
@@ -64,8 +62,6 @@ public class CoderTranslation {
 
   private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> 
knownCoderUrns;
 
-  private static @MonotonicNonNull List<CoderTranslatorRegistrar> 
coderTranslatorRegistrars;
-
   private static @MonotonicNonNull Map<Class<? extends Coder>, 
CoderTranslator<? extends Coder>>
       knownTranslators;
 
@@ -84,53 +80,6 @@ public class CoderTranslation {
     return knownCoderUrns;
   }
 
-  private static void initializeCoderTranslatorRegistrars() {
-    ImmutableList.Builder<CoderTranslatorRegistrar> registrars = 
ImmutableList.builder();
-    for (CoderTranslatorRegistrar coderTranslatorRegistrar :
-        ServiceLoader.load(CoderTranslatorRegistrar.class)) {
-      registrars.add(coderTranslatorRegistrar);
-    }
-    coderTranslatorRegistrars = registrars.build();
-  }
-
-  static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
-    if (coderTranslatorRegistrars == null) {
-      initializeCoderTranslatorRegistrars();
-    }
-    for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
-      if (registrar.isKnownCoder(coder, options)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends 
Coder> coderClass) {
-    if (coderTranslatorRegistrars == null) {
-      initializeCoderTranslatorRegistrars();
-    }
-    for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
-      CoderTranslator translator = registrar.getCoderTranslator(coderClass);
-      if (translator != null) {
-        return translator;
-      }
-    }
-    return null;
-  }
-
-  static Class<? extends Coder> getCoderForUrn(String coderUrn) {
-    if (coderTranslatorRegistrars == null) {
-      initializeCoderTranslatorRegistrars();
-    }
-    for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
-      Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
-      if (coder != null) {
-        return coder;
-      }
-    }
-    return null;
-  }
-
   @VisibleForTesting
   @Deterministic
   static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> 
getKnownTranslators() {
@@ -158,7 +107,7 @@ public class CoderTranslation {
 
   public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents 
components)
       throws IOException {
-    if (isKnownCoder(coder, components.getPipelineOptions())) {
+    if (getKnownCoderUrns().containsKey(coder.getClass())) {
       return toKnownCoder(coder, components);
     }
 
@@ -180,10 +129,7 @@ public class CoderTranslation {
 
   private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents 
components)
       throws IOException {
-    CoderTranslator translator = getCoderTranslator(coder.getClass());
-    if (translator == null) {
-      throw new IOException("Unable to find CoderTranslator for known Coder");
-    }
+    CoderTranslator translator = getKnownTranslators().get(coder.getClass());
     List<String> componentIds = registerComponents(coder, translator, 
components);
     return RunnerApi.Coder.newBuilder()
         .addAllComponentCoderIds(componentIds)
@@ -240,8 +186,8 @@ public class CoderTranslation {
                   components.getComponents().getCodersOrThrow(componentId), 
components, context);
       coderComponents.add(innerCoder);
     }
-    Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
-    CoderTranslator<?> translator = getCoderTranslator(coderType);
+    Class<? extends Coder> coderType = 
getKnownCoderUrns().inverse().get(coderUrn);
+    CoderTranslator<?> translator = getKnownTranslators().get(coderType);
     if (translator != null) {
       return translator.fromComponents(
           coderComponents, coder.getSpec().getPayload().toByteArray(), 
context);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
index 78f5b61c0f0..3d89c4c7ff4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
@@ -28,7 +28,6 @@ import 
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext
  * additional payload, which is not currently supported. This exists as a 
temporary measure.
  */
 public interface CoderTranslator<T extends Coder<?>> {
-
   /** Extract all component {@link Coder coders} within a coder. */
   List<? extends Coder<?>> getComponents(T from);
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
index 44e8c2956ae..b69d0290de5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.util.construction;
 
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A registrar of {@link Coder} URNs to the associated {@link 
CoderTranslator}. */
 @SuppressWarnings({
@@ -36,18 +34,4 @@ public interface CoderTranslatorRegistrar {
 
   /** Returns a mapping of URN to {@link CoderTranslator}. */
   Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> 
getCoderTranslators();
-
-  /**
-   * Returns whether the given Coder is known to this 
CoderTranslatorRegistrar. If the Coder is
-   * known, then getCoderTranslator() will return a non-null CoderTranslator.
-   */
-  boolean isKnownCoder(Coder<?> coder, PipelineOptions options);
-
-  /** Returns the CoderTranslator to use for this Coder, or null if the Coder 
is not known. */
-  @Nullable
-  CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> 
coderClass);
-
-  /** Returns the Coder to use for the given Urn, or null if the Urn is for an 
unknown Coder. */
-  @Nullable
-  Class<? extends Coder> getCoderForUrn(String coderUrn);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
index a847bf780df..84a90721a98 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util.construction;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.model.pipeline.v1.SchemaApi;
@@ -31,19 +30,12 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.ShardedKey;
-import 
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 
@@ -185,77 +177,6 @@ class CoderTranslators {
     };
   }
 
-  static <T> CoderTranslator<SchemaCoder<T>> schema() {
-    return new CoderTranslator<SchemaCoder<T>>() {
-      private static final String TO_ROW_FUNCTION_URN = 
"beam:torowfn:javasdk:v1";
-      private static final String FROM_ROW_FUNCTION_URN = 
"beam:fromrowfn:javasdk:v1";
-      private static final String TYPE_DESCRIPTOR_URN = 
"beam:typedescriptor:javasdk:v1";
-
-      @Override
-      public ImmutableList<? extends Coder<?>> getComponents(SchemaCoder<T> 
from) {
-        return ImmutableList.of();
-      }
-
-      @Override
-      public byte[] getPayload(SchemaCoder<T> from) {
-        SchemaApi.SchemaCoderPayload.Builder payload = 
SchemaApi.SchemaCoderPayload.newBuilder();
-        payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(), 
true));
-        payload
-            .getToRowFnBuilder()
-            .setUrn(TO_ROW_FUNCTION_URN)
-            .setPayload(
-                ByteString.copyFrom(
-                    
SerializableUtils.serializeToByteArray(from.getToRowFunction())));
-        payload
-            .getFromRowFnBuilder()
-            .setUrn(FROM_ROW_FUNCTION_URN)
-            .setPayload(
-                ByteString.copyFrom(
-                    
SerializableUtils.serializeToByteArray(from.getFromRowFunction())));
-        payload
-            .addAdditionalCoderInfosBuilder()
-            .setUrn(TYPE_DESCRIPTOR_URN)
-            .setPayload(
-                ByteString.copyFrom(
-                    
SerializableUtils.serializeToByteArray(from.getEncodedTypeDescriptor())));
-        return payload.build().toByteArray();
-      }
-
-      @Override
-      public SchemaCoder<T> fromComponents(
-          List<Coder<?>> components, byte[] payload, TranslationContext 
context) {
-        checkArgument(
-            components.isEmpty(), "Expected empty component list, but 
received: %s", components);
-        try {
-          SchemaApi.SchemaCoderPayload schemaCoderPayload =
-              SchemaApi.SchemaCoderPayload.parseFrom(payload);
-          if (schemaCoderPayload.getAdditionalCoderInfosCount() == 0) {
-            throw new IllegalArgumentException("Missing serialized 
typeDescriptor");
-          }
-          TypeDescriptor<T> typeDescriptor =
-              (TypeDescriptor<T>)
-                  SerializableUtils.deserializeFromByteArray(
-                      
schemaCoderPayload.getAdditionalCoderInfos(0).getPayload().toByteArray(),
-                      "typeDescriptor");
-          SerializableFunction<T, Row> toRowFunction =
-              (SerializableFunction<T, Row>)
-                  SerializableUtils.deserializeFromByteArray(
-                      
schemaCoderPayload.getToRowFn().getPayload().toByteArray(), "toRowFunction");
-          SerializableFunction<Row, T> fromRowFunction =
-              (SerializableFunction<Row, T>)
-                  SerializableUtils.deserializeFromByteArray(
-                      
schemaCoderPayload.getFromRowFn().getPayload().toByteArray(),
-                      "fromRowFunction");
-
-          Schema schema = 
SchemaTranslation.schemaFromProto(schemaCoderPayload.getSchema());
-          return SchemaCoder.of(schema, typeDescriptor, toRowFunction, 
fromRowFunction);
-        } catch (IOException | IllegalArgumentException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
   static CoderTranslator<ShardedKey.Coder<?>> shardedKey() {
     return new SimpleStructuredCoderTranslator<ShardedKey.Coder<?>>() {
       @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
index 1f9f1eaafbe..5b0d5aedd61 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
@@ -34,9 +34,6 @@ import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.ShardedKey;
@@ -74,7 +71,6 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
               ModelCoders.PARAM_WINDOWED_VALUE_CODER_URN)
           .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
           .put(RowCoder.class, ModelCoders.ROW_CODER_URN)
-          .put(SchemaCoder.class, ModelCoders.SCHEMA_CODER_URN)
           .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN)
           .put(TimestampPrefixingWindowCoder.class, 
ModelCoders.CUSTOM_WINDOW_CODER_URN)
           .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN)
@@ -100,7 +96,6 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
                   CoderTranslators.paramWindowedValue())
               .put(DoubleCoder.class, 
CoderTranslators.atomic(DoubleCoder.class))
               .put(RowCoder.class, CoderTranslators.row())
-              .put(SchemaCoder.class, CoderTranslators.schema())
               .put(ShardedKey.Coder.class, CoderTranslators.shardedKey())
               .put(TimestampPrefixingWindowCoder.class, 
CoderTranslators.timestampPrefixingWindow())
               .put(NullableCoder.class, CoderTranslators.nullable())
@@ -128,6 +123,10 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
         Coder.class.getSimpleName());
   }
 
+  public static boolean isKnownCoder(Coder<?> coder) {
+    return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
+  }
+
   @Override
   public Map<Class<? extends Coder>, String> getCoderURNs() {
     return BEAM_MODEL_CODER_URNS;
@@ -137,23 +136,4 @@ public class ModelCoderRegistrar implements 
CoderTranslatorRegistrar {
   public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> 
getCoderTranslators() {
     return BEAM_MODEL_CODERS;
   }
-
-  @Override
-  public boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
-    if (coder.getClass() == SchemaCoder.class
-        && StreamingOptions.updateCompatibilityVersionLessThan(options, 
"2.74")) {
-      return false;
-    }
-    return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
-  }
-
-  @Override
-  public CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends 
Coder> coderClass) {
-    return BEAM_MODEL_CODERS.getOrDefault(coderClass, null);
-  }
-
-  @Override
-  public Class<? extends Coder> getCoderForUrn(String coderUrn) {
-    return BEAM_MODEL_CODER_URNS.inverse().getOrDefault(coderUrn, null);
-  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
index 5059cc1c6b8..7b7546aceb6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
@@ -61,7 +61,6 @@ public class ModelCoders {
       getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE);
 
   public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW);
-  public static final String SCHEMA_CODER_URN = 
getUrn(StandardCoders.Enum.SCHEMA);
 
   public static final String STATE_BACKED_ITERABLE_CODER_URN =
       "beam:coder:state_backed_iterable:v1";
@@ -91,7 +90,6 @@ public class ModelCoders {
           WINDOWED_VALUE_CODER_URN,
           DOUBLE_CODER_URN,
           ROW_CODER_URN,
-          SCHEMA_CODER_URN,
           PARAM_WINDOWED_VALUE_CODER_URN,
           STATE_BACKED_ITERABLE_CODER_URN,
           SHARDED_KEY_CODER_URN,
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
index 64c7898a37b..f7969621436 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
@@ -189,7 +189,6 @@ public class RehydratedComponents {
         windowingStrategies.asMap(),
         coders.asMap(),
         Collections.emptyMap(),
-        requirements,
-        pipeline.getOptions());
+        requirements);
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
index 6288649aba3..446697f24a8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
@@ -63,7 +63,6 @@ public class SdkComponents {
   private final BiMap<Environment, String> environmentIds = HashBiMap.create();
   private final BiMap<RunnerApi.Coder, String> coderProtoToId = 
HashBiMap.create();
   private final Set<String> requirements;
-  private final PipelineOptions pipelineOptions;
 
   private final Set<String> reservedIds = new HashSet<>();
 
@@ -72,7 +71,17 @@ public class SdkComponents {
 
   /** Create a new {@link SdkComponents} with no components. */
   public static SdkComponents create() {
-    return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, 
"", null);
+    return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, 
"");
+  }
+
+  /**
+   * Create new {@link SdkComponents} importing all items from provided {@link 
Components} object.
+   *
+   * <p>WARNING: This action might cause some of duplicate items created.
+   */
+  public static SdkComponents create(
+      RunnerApi.Components components, Collection<String> requirements) {
+    return new SdkComponents(components, requirements, "");
   }
 
   /*package*/ static SdkComponents create(
@@ -82,9 +91,8 @@ public class SdkComponents {
       Map<String, WindowingStrategy<?, ?>> windowingStrategies,
       Map<String, Coder<?>> coders,
       Map<String, Environment> environments,
-      Collection<String> requirements,
-      PipelineOptions pipelineOptions) {
-    SdkComponents sdkComponents = new SdkComponents(components, requirements, 
"", pipelineOptions);
+      Collection<String> requirements) {
+    SdkComponents sdkComponents = SdkComponents.create(components, 
requirements);
     sdkComponents.transformIds.inverse().putAll(transforms);
     sdkComponents.pCollectionIds.inverse().putAll(pCollections);
     sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies);
@@ -95,28 +103,19 @@ public class SdkComponents {
 
   public static SdkComponents create(PipelineOptions options) {
     SdkComponents sdkComponents =
-        new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", 
options);
+        new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "");
     PortablePipelineOptions portablePipelineOptions = 
options.as(PortablePipelineOptions.class);
     sdkComponents.registerEnvironment(
         Environments.createOrGetDefaultEnvironment(portablePipelineOptions));
     return sdkComponents;
   }
 
-  public static SdkComponents create(PipelineOptions options, Environment 
environment) {
-    SdkComponents sdkComponents =
-        new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", 
options);
-    sdkComponents.registerEnvironment(environment);
-    return sdkComponents;
-  }
-
   private SdkComponents(
       @Nullable Components components,
       @Nullable Collection<String> requirements,
-      String newIdPrefix,
-      @Nullable PipelineOptions pipelineOptions) {
+      String newIdPrefix) {
     this.newIdPrefix = newIdPrefix;
     this.requirements = new HashSet<>();
-    this.pipelineOptions = pipelineOptions;
 
     if (components == null) {
       if (requirements != null) {
@@ -154,7 +153,7 @@ public class SdkComponents {
    */
   public SdkComponents withNewIdPrefix(String newIdPrefix) {
     SdkComponents sdkComponents =
-        new SdkComponents(componentsBuilder.build(), requirements, 
newIdPrefix, pipelineOptions);
+        new SdkComponents(componentsBuilder.build(), requirements, 
newIdPrefix);
     sdkComponents.transformIds.putAll(transformIds);
     sdkComponents.pCollectionIds.putAll(pCollectionIds);
     sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds);
@@ -175,7 +174,7 @@ public class SdkComponents {
       throws IOException {
     String name = getApplicationName(appliedPTransform);
     // If this transform is present in the components, nothing to do. return 
the existing name.
-    // Otherwise, the transform must be translated and added to the components.
+    // Otherwise the transform must be translated and added to the components.
     if (componentsBuilder.getTransformsOrDefault(name, null) != null) {
       return name;
     }
@@ -376,8 +375,4 @@ public class SdkComponents {
   public Collection<String> requirements() {
     return ImmutableSet.copyOf(requirements);
   }
-
-  public PipelineOptions getPipelineOptions() {
-    return pipelineOptions;
-  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
index 1ec0a74f5be..b8f92ff0053 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.not;
 
-import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -46,20 +45,14 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.SchemaCoder;
-import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.ShardedKey;
 import 
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -77,34 +70,6 @@ import org.junit.runners.Parameterized.Parameters;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class CoderTranslationTest {
-  @AutoValue
-  @DefaultSchema(AutoValueSchema.class)
-  public abstract static class SimpleAutoValue {
-    public abstract String getString();
-
-    public abstract int getInt32();
-
-    public abstract long getInt64();
-
-    public static SimpleAutoValue of(String string, Integer int32, Long int64) 
{
-      return new AutoValue_CoderTranslationTest_SimpleAutoValue(string, int32, 
int64);
-    }
-  }
-
-  private static final SchemaRegistry REGISTRY = 
SchemaRegistry.createDefault();
-
-  private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) {
-    try {
-      return SchemaCoder.of(
-          REGISTRY.getSchema(typeDescriptor),
-          typeDescriptor,
-          REGISTRY.getToRowFunction(typeDescriptor),
-          REGISTRY.getFromRowFunction(typeDescriptor));
-    } catch (NoSuchSchemaException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   private static final Set<Coder<?>> KNOWN_CODERS =
       ImmutableSet.<Coder<?>>builder()
           .add(ByteArrayCoder.of())
@@ -129,7 +94,6 @@ public class CoderTranslationTest {
                       Field.of("array", FieldType.array(FieldType.STRING)),
                       Field.of("map", FieldType.map(FieldType.STRING, 
FieldType.INT32)),
                       Field.of("bar", 
FieldType.logicalType(FixedBytes.of(123))))))
-          .add(schemaCoderFrom(TypeDescriptor.of(SimpleAutoValue.class)))
           .add(ShardedKey.Coder.of(StringUtf8Coder.of()))
           .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of()))
           .add(NullableCoder.of(ByteArrayCoder.of()))
@@ -163,7 +127,7 @@ public class CoderTranslationTest {
     }
 
     @Test
-    public void validateModelCoderTranslators() {
+    public void validateCoderTranslators() {
       assertThat(
           "Every Model Coder must have a Translator",
           new ModelCoderRegistrar().getCoderURNs().keySet(),
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index c93de201479..6ecb029c5d9 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -600,7 +600,7 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
           pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read");
     } else {
       LOG.warn(
-          "Using use_deprecated_read in portable runners is runner-dependent. 
The "
+          "Using use_depreacted_read in portable runners is runner-dependent. 
The "
               + "ExpansionService will respect that, but if your runner does 
not have support for "
               + "native Read transform, your Pipeline will fail during 
Pipeline submission.");
     }
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
index 8bd18fd8e25..14ab48f6669 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
@@ -21,11 +21,9 @@ import com.google.auto.service.AutoService;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.construction.CoderTranslator;
 import org.apache.beam.sdk.util.construction.CoderTranslatorRegistrar;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Coder registrar for AvroGenericCoder. */
 @AutoService(CoderTranslatorRegistrar.class)
@@ -44,20 +42,4 @@ public class AvroGenericCoderRegistrar implements 
CoderTranslatorRegistrar {
   public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> 
getCoderTranslators() {
     return ImmutableMap.of(AvroGenericCoder.class, new 
AvroGenericCoderTranslator());
   }
-
-  @Override
-  public boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
-    return coder.getClass() == AvroGenericCoder.class;
-  }
-
-  @Override
-  public @Nullable CoderTranslator<? extends Coder> getCoderTranslator(
-      Class<? extends Coder> coderClass) {
-    return coderClass == AvroGenericCoder.class ? new 
AvroGenericCoderTranslator() : null;
-  }
-
-  @Override
-  public @Nullable Class<? extends Coder> getCoderForUrn(String coderUrn) {
-    return AVRO_CODER_URN.equals(coderUrn) ? AvroGenericCoder.class : null;
-  }
 }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
index 42a6f8d11c2..ef8d69bc1ec 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
@@ -38,7 +38,6 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.sdk.coders.IterableLikeCoder;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
@@ -53,7 +52,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -302,26 +300,6 @@ public class StateBackedIterable<T>
         getCoderTranslators() {
       return ImmutableMap.of(StateBackedIterable.Coder.class, new 
Translator());
     }
-
-    @Override
-    public boolean isKnownCoder(
-        org.apache.beam.sdk.coders.Coder<?> coder, PipelineOptions options) {
-      return coder.getClass() == StateBackedIterable.Coder.class;
-    }
-
-    @Override
-    public @Nullable CoderTranslator<? extends 
org.apache.beam.sdk.coders.Coder> getCoderTranslator(
-        Class<? extends org.apache.beam.sdk.coders.Coder> coderClass) {
-      return coderClass == StateBackedIterable.Coder.class ? new Translator() 
: null;
-    }
-
-    @Override
-    public @Nullable Class<? extends org.apache.beam.sdk.coders.Coder> 
getCoderForUrn(
-        String coderUrn) {
-      return STATE_BACKED_ITERABLE_CODER_URN.equals(coderUrn)
-          ? StateBackedIterable.Coder.class
-          : null;
-    }
   }
 
   /**

Reply via email to