This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 357fd262211 Revert #37631 and #38497 on HEAD (#38516)
357fd262211 is described below
commit 357fd2622114ccce99726baa5e9735895e5392ed
Author: Yi Hu <[email protected]>
AuthorDate: Fri May 15 17:02:41 2026 -0400
Revert #37631 and #38497 on HEAD (#38516)
* Revert "Adds a new coder translator for Java SchemaCoder. (#37631)"
This reverts commit 81769cbfd132ed3c257685fbdc87f076b903e9f5.
* Revert "Sickbay two failed tests due to new schema coder urn. (#38497)"
This reverts commit 3dbd7c8c35b6a396c5e6c6fed2a3b37d4f731252.
---
.../beam_PostCommit_Java_ValidatesRunner_ULR.json | 3 +-
CHANGES.md | 5 +-
.../dataflow/DataflowPipelineTranslator.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 59 +++++++---------
.../dataflow/DataflowPipelineTranslatorTest.java | 77 ++++-----------------
.../control/ProcessBundleDescriptorsTest.java | 6 +-
runners/portability/java/build.gradle | 5 --
.../sdk/util/construction/CoderTranslation.java | 62 ++---------------
.../sdk/util/construction/CoderTranslator.java | 1 -
.../construction/CoderTranslatorRegistrar.java | 16 -----
.../sdk/util/construction/CoderTranslators.java | 79 ----------------------
.../sdk/util/construction/ModelCoderRegistrar.java | 28 ++------
.../beam/sdk/util/construction/ModelCoders.java | 2 -
.../util/construction/RehydratedComponents.java | 3 +-
.../beam/sdk/util/construction/SdkComponents.java | 39 +++++------
.../util/construction/CoderTranslationTest.java | 38 +----------
.../sdk/expansion/service/ExpansionService.java | 2 +-
.../extensions/avro/AvroGenericCoderRegistrar.java | 18 -----
.../beam/fn/harness/state/StateBackedIterable.java | 22 ------
19 files changed, 73 insertions(+), 394 deletions(-)
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
index fbd81891f93..6e2f429dd24 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_ULR.json
@@ -2,6 +2,5 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to
run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should
run this test",
- "https://github.com/apache/beam/pull/35159": "moving WindowedValue and
making an interface",
- "https://github.com/apache/beam/pull/38497": "sickbay two failed tests"
+ "https://github.com/apache/beam/pull/35159": "moving WindowedValue and
making an interface"
}
diff --git a/CHANGES.md b/CHANGES.md
index ca911e52a7a..52475a99d8e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -85,9 +85,6 @@
## Breaking Changes
-* Portable Java SDK now encodes SchemaCoders in a portable way
([#34672](https://github.com/apache/beam/issues/34672)).
- - Original custom Java coder encoding can still be obtained using
[StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47)
([#34672](https://github.com/apache/beam/issues/34672)).
- - Fixes ([#36496](https://github.com/apache/beam/issues/36496)),
([#30276](https://github.com/apache/beam/issues/30276)),
([#29245](https://github.com/apache/beam/issues/29245)).
* (Python) Made Beartype the default fallback type checking tool. This can be
disabled with the `--disable_beartype` pipeline option.
([#38275](https://github.com/apache/beam/issues/38275))
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
@@ -2441,4 +2438,4 @@ Schema Options, it will be removed in version `2.23.0`.
([BEAM-9704](https://iss
## Highlights
-- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
+- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 1609cf6ea23..4016f31a547 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -221,7 +221,7 @@ public class DataflowPipelineTranslator {
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
- SdkComponents sdkComponents = SdkComponents.create(options);
+ SdkComponents sdkComponents = SdkComponents.create();
String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c19673a3117..299e7fa21ed 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1333,20 +1333,19 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways,
depending on whether
- // it is a portable or non-portable job submission.
+ // it is a v1 or v2 job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
- // The SdkComponents for portable and non-portable job submission must be
kept distinct. Both
+ // The SdkComponents for portable an non-portable job submission must be
kept distinct. Both
// need the default environment.
- SdkComponents portableComponents =
- SdkComponents.create(
- options,
- defaultEnvironmentForDataflow
- .toBuilder()
- .addAllDependencies(getDefaultArtifacts())
- .addAllCapabilities(Environments.getJavaCapabilities())
- .build());
+ SdkComponents portableComponents = SdkComponents.create();
+ portableComponents.registerEnvironment(
+ defaultEnvironmentForDataflow
+ .toBuilder()
+ .addAllDependencies(getDefaultArtifacts())
+ .addAllCapabilities(Environments.getJavaCapabilities())
+ .build());
RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
@@ -1375,30 +1374,28 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
if (useUnifiedWorker(options)) {
- LOG.info(
- "Skipping non-portable transform replacements since job will run on
portable worker.");
+ LOG.info("Skipping v1 transform replacements since job will run on v2.");
} else {
- // Now rewrite things to be as needed for non-portable (mutates the
pipeline).
- // This way the job submitted is valid for portable and non-portable,
simultaneously.
+ // Now rewrite things to be as needed for v1 (mutates the pipeline)
+ // This way the job submitted is valid for v1 and v2, simultaneously
replaceV1Transforms(pipeline);
}
- // Capture the SdkComponents for look up during step translations.
- SdkComponents dataflowNonPortableComponents =
- SdkComponents.create(
- options,
- defaultEnvironmentForDataflow
- .toBuilder()
- .addAllDependencies(getDefaultArtifacts())
- .addAllCapabilities(Environments.getJavaCapabilities())
- .build());
- // No need to perform transform upgrading for the non-portable runner
proto.
- RunnerApi.Pipeline dataflowNonPortablePipelineProto =
- PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents,
true, false);
+ // Capture the SdkComponents for look up during step translations
+ SdkComponents dataflowV1Components = SdkComponents.create();
+ dataflowV1Components.registerEnvironment(
+ defaultEnvironmentForDataflow
+ .toBuilder()
+ .addAllDependencies(getDefaultArtifacts())
+ .addAllCapabilities(Environments.getJavaCapabilities())
+ .build());
+ // No need to perform transform upgrading for the Runner v1 proto.
+ RunnerApi.Pipeline dataflowV1PipelineProto =
+ PipelineTranslation.toProto(pipeline, dataflowV1Components, true,
false);
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Dataflow non-portable worker pipeline proto:\n{}",
-
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
+ "Dataflow v1 pipeline proto:\n{}",
+ TextFormat.printer().printToString(dataflowV1PipelineProto));
}
// Set a unique client_request_id in the CreateJob request.
@@ -1418,11 +1415,7 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
JobSpecification jobSpecification =
translator.translate(
- pipeline,
- dataflowNonPortablePipelineProto,
- dataflowNonPortableComponents,
- this,
- packages);
+ pipeline, dataflowV1PipelineProto, dataflowV1Components, this,
packages);
if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) &&
!useUnifiedWorker(options)) {
List<String> experiments =
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 953f7a638ed..f8818931a68 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -47,7 +47,6 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -93,8 +92,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
@@ -169,11 +166,15 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
private SdkComponents createSdkComponents(PipelineOptions options) {
+ SdkComponents sdkComponents = SdkComponents.create();
+
String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(containerImageURL);
- return SdkComponents.create(options, defaultEnvironmentForDataflow);
+
+ sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
+ return sdkComponents;
}
// A Custom Mockito matcher for an initial Job that checks that all
@@ -1293,16 +1294,15 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
file1.deleteOnExit();
File file2 = File.createTempFile("file2-", ".txt");
file2.deleteOnExit();
- SdkComponents sdkComponents =
- SdkComponents.create(
- options,
-
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
- .toBuilder()
- .addAllDependencies(
- Environments.getArtifacts(
- ImmutableList.of("file1.txt=" + file1, "file2.txt=" +
file2)))
- .addAllCapabilities(Environments.getJavaCapabilities())
- .build());
+ SdkComponents sdkComponents = SdkComponents.create();
+ sdkComponents.registerEnvironment(
+
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
+ .toBuilder()
+ .addAllDependencies(
+ Environments.getArtifacts(
+ ImmutableList.of("file1.txt=" + file1, "file2.txt=" +
file2)))
+ .addAllCapabilities(Environments.getJavaCapabilities())
+ .build());
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline,
sdkComponents, true);
@@ -1870,53 +1870,4 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
return null;
}
}
-
- @AutoValue
- @DefaultSchema(AutoValueSchema.class)
- public abstract static class SimpleAutoValue {
- public abstract String getString();
-
- public abstract int getInt32();
-
- public abstract long getInt64();
-
- public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
- String string, int int32, long int64) {
- return new
AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
- }
- }
-
- @Test
- public void testSchemaCoderTranslation() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- Pipeline pipeline = Pipeline.create(options);
- pipeline
- .apply(Impulse.create())
- .apply(
- MapElements.via(
- new SimpleFunction<byte[], SimpleAutoValue>() {
- @Override
- public SimpleAutoValue apply(byte[] input) {
- return SimpleAutoValue.of("foo", 5, 10L);
- }
- }))
- .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
- {
- SdkComponents sdkComponents = createSdkComponents(options);
- RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline,
sdkComponents, true);
- Map<String, RunnerApi.Coder> coders =
pipelineProto.getComponents().getCodersMap();
- assertTrue(coders.containsKey("SchemaCoder"));
- assertEquals("beam:coder:schema:v1",
coders.get("SchemaCoder").getSpec().getUrn());
- }
-
- // Prior to version 2.74, SchemaCoders are translated as custom java
coders.
- {
- options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
- SdkComponents sdkComponents = createSdkComponents(options);
- RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline,
sdkComponents, true);
- Map<String, RunnerApi.Coder> coders =
pipelineProto.getComponents().getCodersMap();
- assertTrue(coders.containsKey("SchemaCoder"));
- assertEquals("beam:coders:javasdk:0.1",
coders.get("SchemaCoder").getSpec().getUrn());
- }
- }
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
index 9ea7404053d..21d7550c38b 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java
@@ -78,8 +78,7 @@ public class ProcessBundleDescriptorsTest implements
Serializable {
// Add another stateful stage with a non-standard key coder
Pipeline p = Pipeline.create();
Coder<Void> keycoder = VoidCoder.of();
- ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
- assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()),
is(false));
+ assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
p.apply("impulse", Impulse.create())
.apply(
"create",
@@ -166,8 +165,7 @@ public class ProcessBundleDescriptorsTest implements
Serializable {
public void testLengthPrefixingOfInputCoderExecutableStage() throws
Exception {
Pipeline p = Pipeline.create();
Coder<Void> voidCoder = VoidCoder.of();
- ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
- assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()),
is(false));
+ assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
p.apply("impulse", Impulse.create())
.apply(
ParDo.of(
diff --git a/runners/portability/java/build.gradle
b/runners/portability/java/build.gradle
index aa147e8426c..6e3b431e802 100644
--- a/runners/portability/java/build.gradle
+++ b/runners/portability/java/build.gradle
@@ -214,11 +214,6 @@ def createUlrValidatesRunnerTask = { name,
environmentType, dockerImageTask = ""
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
- // TODO(https://github.com/apache/beam/issues/33859): Failed with
"KeyError: 'beam:coder:schema:v1'".
- // New schema coder urn is not yet supported in runners other than
dataflow
- excludeTestsMatching
'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithShuffle'
- excludeTestsMatching
'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithoutShuffle'
-
for (String test : sickbayTests) {
excludeTestsMatching test
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
index 2cc4bf0c6a0..22859dc68b9 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
@@ -25,13 +25,11 @@ import java.util.ServiceLoader;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.dataflow.qual.Deterministic;
@@ -64,8 +62,6 @@ public class CoderTranslation {
private static @MonotonicNonNull BiMap<Class<? extends Coder>, String>
knownCoderUrns;
- private static @MonotonicNonNull List<CoderTranslatorRegistrar>
coderTranslatorRegistrars;
-
private static @MonotonicNonNull Map<Class<? extends Coder>,
CoderTranslator<? extends Coder>>
knownTranslators;
@@ -84,53 +80,6 @@ public class CoderTranslation {
return knownCoderUrns;
}
- private static void initializeCoderTranslatorRegistrars() {
- ImmutableList.Builder<CoderTranslatorRegistrar> registrars =
ImmutableList.builder();
- for (CoderTranslatorRegistrar coderTranslatorRegistrar :
- ServiceLoader.load(CoderTranslatorRegistrar.class)) {
- registrars.add(coderTranslatorRegistrar);
- }
- coderTranslatorRegistrars = registrars.build();
- }
-
- static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
- if (coderTranslatorRegistrars == null) {
- initializeCoderTranslatorRegistrars();
- }
- for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
- if (registrar.isKnownCoder(coder, options)) {
- return true;
- }
- }
- return false;
- }
-
- static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends
Coder> coderClass) {
- if (coderTranslatorRegistrars == null) {
- initializeCoderTranslatorRegistrars();
- }
- for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
- CoderTranslator translator = registrar.getCoderTranslator(coderClass);
- if (translator != null) {
- return translator;
- }
- }
- return null;
- }
-
- static Class<? extends Coder> getCoderForUrn(String coderUrn) {
- if (coderTranslatorRegistrars == null) {
- initializeCoderTranslatorRegistrars();
- }
- for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
- Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
- if (coder != null) {
- return coder;
- }
- }
- return null;
- }
-
@VisibleForTesting
@Deterministic
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
getKnownTranslators() {
@@ -158,7 +107,7 @@ public class CoderTranslation {
public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents
components)
throws IOException {
- if (isKnownCoder(coder, components.getPipelineOptions())) {
+ if (getKnownCoderUrns().containsKey(coder.getClass())) {
return toKnownCoder(coder, components);
}
@@ -180,10 +129,7 @@ public class CoderTranslation {
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents
components)
throws IOException {
- CoderTranslator translator = getCoderTranslator(coder.getClass());
- if (translator == null) {
- throw new IOException("Unable to find CoderTranslator for known Coder");
- }
+ CoderTranslator translator = getKnownTranslators().get(coder.getClass());
List<String> componentIds = registerComponents(coder, translator,
components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
@@ -240,8 +186,8 @@ public class CoderTranslation {
components.getComponents().getCodersOrThrow(componentId),
components, context);
coderComponents.add(innerCoder);
}
- Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
- CoderTranslator<?> translator = getCoderTranslator(coderType);
+ Class<? extends Coder> coderType =
getKnownCoderUrns().inverse().get(coderUrn);
+ CoderTranslator<?> translator = getKnownTranslators().get(coderType);
if (translator != null) {
return translator.fromComponents(
coderComponents, coder.getSpec().getPayload().toByteArray(),
context);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
index 78f5b61c0f0..3d89c4c7ff4 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
@@ -28,7 +28,6 @@ import
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext
* additional payload, which is not currently supported. This exists as a
temporary measure.
*/
public interface CoderTranslator<T extends Coder<?>> {
-
/** Extract all component {@link Coder coders} within a coder. */
List<? extends Coder<?>> getComponents(T from);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
index 44e8c2956ae..b69d0290de5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.util.construction;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.checkerframework.checker.nullness.qual.Nullable;
/** A registrar of {@link Coder} URNs to the associated {@link
CoderTranslator}. */
@SuppressWarnings({
@@ -36,18 +34,4 @@ public interface CoderTranslatorRegistrar {
/** Returns a mapping of URN to {@link CoderTranslator}. */
Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
getCoderTranslators();
-
- /**
- * Returns whether the given Coder is known to this
CoderTranslatorRegistrar. If the Coder is
- * known, then getCoderTranslator() will return a non-null CoderTranslator.
- */
- boolean isKnownCoder(Coder<?> coder, PipelineOptions options);
-
- /** Returns the CoderTranslator to use for this Coder, or null if the Coder
is not known. */
- @Nullable
- CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder>
coderClass);
-
- /** Returns the Coder to use for the given Urn, or null if the Urn is for an
unknown Coder. */
- @Nullable
- Class<? extends Coder> getCoderForUrn(String coderUrn);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
index a847bf780df..84a90721a98 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util.construction;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.model.pipeline.v1.SchemaApi;
@@ -31,19 +30,12 @@ import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.ShardedKey;
-import
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -185,77 +177,6 @@ class CoderTranslators {
};
}
- static <T> CoderTranslator<SchemaCoder<T>> schema() {
- return new CoderTranslator<SchemaCoder<T>>() {
- private static final String TO_ROW_FUNCTION_URN =
"beam:torowfn:javasdk:v1";
- private static final String FROM_ROW_FUNCTION_URN =
"beam:fromrowfn:javasdk:v1";
- private static final String TYPE_DESCRIPTOR_URN =
"beam:typedescriptor:javasdk:v1";
-
- @Override
- public ImmutableList<? extends Coder<?>> getComponents(SchemaCoder<T>
from) {
- return ImmutableList.of();
- }
-
- @Override
- public byte[] getPayload(SchemaCoder<T> from) {
- SchemaApi.SchemaCoderPayload.Builder payload =
SchemaApi.SchemaCoderPayload.newBuilder();
- payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(),
true));
- payload
- .getToRowFnBuilder()
- .setUrn(TO_ROW_FUNCTION_URN)
- .setPayload(
- ByteString.copyFrom(
-
SerializableUtils.serializeToByteArray(from.getToRowFunction())));
- payload
- .getFromRowFnBuilder()
- .setUrn(FROM_ROW_FUNCTION_URN)
- .setPayload(
- ByteString.copyFrom(
-
SerializableUtils.serializeToByteArray(from.getFromRowFunction())));
- payload
- .addAdditionalCoderInfosBuilder()
- .setUrn(TYPE_DESCRIPTOR_URN)
- .setPayload(
- ByteString.copyFrom(
-
SerializableUtils.serializeToByteArray(from.getEncodedTypeDescriptor())));
- return payload.build().toByteArray();
- }
-
- @Override
- public SchemaCoder<T> fromComponents(
- List<Coder<?>> components, byte[] payload, TranslationContext
context) {
- checkArgument(
- components.isEmpty(), "Expected empty component list, but
received: %s", components);
- try {
- SchemaApi.SchemaCoderPayload schemaCoderPayload =
- SchemaApi.SchemaCoderPayload.parseFrom(payload);
- if (schemaCoderPayload.getAdditionalCoderInfosCount() == 0) {
- throw new IllegalArgumentException("Missing serialized
typeDescriptor");
- }
- TypeDescriptor<T> typeDescriptor =
- (TypeDescriptor<T>)
- SerializableUtils.deserializeFromByteArray(
-
schemaCoderPayload.getAdditionalCoderInfos(0).getPayload().toByteArray(),
- "typeDescriptor");
- SerializableFunction<T, Row> toRowFunction =
- (SerializableFunction<T, Row>)
- SerializableUtils.deserializeFromByteArray(
-
schemaCoderPayload.getToRowFn().getPayload().toByteArray(), "toRowFunction");
- SerializableFunction<Row, T> fromRowFunction =
- (SerializableFunction<Row, T>)
- SerializableUtils.deserializeFromByteArray(
-
schemaCoderPayload.getFromRowFn().getPayload().toByteArray(),
- "fromRowFunction");
-
- Schema schema =
SchemaTranslation.schemaFromProto(schemaCoderPayload.getSchema());
- return SchemaCoder.of(schema, typeDescriptor, toRowFunction,
fromRowFunction);
- } catch (IOException | IllegalArgumentException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
static CoderTranslator<ShardedKey.Coder<?>> shardedKey() {
return new SimpleStructuredCoderTranslator<ShardedKey.Coder<?>>() {
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
index 1f9f1eaafbe..5b0d5aedd61 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
@@ -34,9 +34,6 @@ import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.ShardedKey;
@@ -74,7 +71,6 @@ public class ModelCoderRegistrar implements
CoderTranslatorRegistrar {
ModelCoders.PARAM_WINDOWED_VALUE_CODER_URN)
.put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
.put(RowCoder.class, ModelCoders.ROW_CODER_URN)
- .put(SchemaCoder.class, ModelCoders.SCHEMA_CODER_URN)
.put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN)
.put(TimestampPrefixingWindowCoder.class,
ModelCoders.CUSTOM_WINDOW_CODER_URN)
.put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN)
@@ -100,7 +96,6 @@ public class ModelCoderRegistrar implements
CoderTranslatorRegistrar {
CoderTranslators.paramWindowedValue())
.put(DoubleCoder.class,
CoderTranslators.atomic(DoubleCoder.class))
.put(RowCoder.class, CoderTranslators.row())
- .put(SchemaCoder.class, CoderTranslators.schema())
.put(ShardedKey.Coder.class, CoderTranslators.shardedKey())
.put(TimestampPrefixingWindowCoder.class,
CoderTranslators.timestampPrefixingWindow())
.put(NullableCoder.class, CoderTranslators.nullable())
@@ -128,6 +123,10 @@ public class ModelCoderRegistrar implements
CoderTranslatorRegistrar {
Coder.class.getSimpleName());
}
+ public static boolean isKnownCoder(Coder<?> coder) {
+ return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
+ }
+
@Override
public Map<Class<? extends Coder>, String> getCoderURNs() {
return BEAM_MODEL_CODER_URNS;
@@ -137,23 +136,4 @@ public class ModelCoderRegistrar implements
CoderTranslatorRegistrar {
public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
getCoderTranslators() {
return BEAM_MODEL_CODERS;
}
-
- @Override
- public boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
- if (coder.getClass() == SchemaCoder.class
- && StreamingOptions.updateCompatibilityVersionLessThan(options,
"2.74")) {
- return false;
- }
- return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
- }
-
- @Override
- public CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends
Coder> coderClass) {
- return BEAM_MODEL_CODERS.getOrDefault(coderClass, null);
- }
-
- @Override
- public Class<? extends Coder> getCoderForUrn(String coderUrn) {
- return BEAM_MODEL_CODER_URNS.inverse().getOrDefault(coderUrn, null);
- }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
index 5059cc1c6b8..7b7546aceb6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
@@ -61,7 +61,6 @@ public class ModelCoders {
getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE);
public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW);
- public static final String SCHEMA_CODER_URN =
getUrn(StandardCoders.Enum.SCHEMA);
public static final String STATE_BACKED_ITERABLE_CODER_URN =
"beam:coder:state_backed_iterable:v1";
@@ -91,7 +90,6 @@ public class ModelCoders {
WINDOWED_VALUE_CODER_URN,
DOUBLE_CODER_URN,
ROW_CODER_URN,
- SCHEMA_CODER_URN,
PARAM_WINDOWED_VALUE_CODER_URN,
STATE_BACKED_ITERABLE_CODER_URN,
SHARDED_KEY_CODER_URN,
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
index 64c7898a37b..f7969621436 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
@@ -189,7 +189,6 @@ public class RehydratedComponents {
windowingStrategies.asMap(),
coders.asMap(),
Collections.emptyMap(),
- requirements,
- pipeline.getOptions());
+ requirements);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
index 6288649aba3..446697f24a8 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
@@ -63,7 +63,6 @@ public class SdkComponents {
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
private final BiMap<RunnerApi.Coder, String> coderProtoToId =
HashBiMap.create();
private final Set<String> requirements;
- private final PipelineOptions pipelineOptions;
private final Set<String> reservedIds = new HashSet<>();
@@ -72,7 +71,17 @@ public class SdkComponents {
/** Create a new {@link SdkComponents} with no components. */
public static SdkComponents create() {
- return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null,
"", null);
+ return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null,
"");
+ }
+
+ /**
+ * Create new {@link SdkComponents} importing all items from provided {@link
Components} object.
+ *
+ * <p>WARNING: This action might cause some of duplicate items created.
+ */
+ public static SdkComponents create(
+ RunnerApi.Components components, Collection<String> requirements) {
+ return new SdkComponents(components, requirements, "");
}
/*package*/ static SdkComponents create(
@@ -82,9 +91,8 @@ public class SdkComponents {
Map<String, WindowingStrategy<?, ?>> windowingStrategies,
Map<String, Coder<?>> coders,
Map<String, Environment> environments,
- Collection<String> requirements,
- PipelineOptions pipelineOptions) {
- SdkComponents sdkComponents = new SdkComponents(components, requirements,
"", pipelineOptions);
+ Collection<String> requirements) {
+ SdkComponents sdkComponents = SdkComponents.create(components,
requirements);
sdkComponents.transformIds.inverse().putAll(transforms);
sdkComponents.pCollectionIds.inverse().putAll(pCollections);
sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies);
@@ -95,28 +103,19 @@ public class SdkComponents {
public static SdkComponents create(PipelineOptions options) {
SdkComponents sdkComponents =
- new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "",
options);
+ new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "");
PortablePipelineOptions portablePipelineOptions =
options.as(PortablePipelineOptions.class);
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(portablePipelineOptions));
return sdkComponents;
}
- public static SdkComponents create(PipelineOptions options, Environment
environment) {
- SdkComponents sdkComponents =
- new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "",
options);
- sdkComponents.registerEnvironment(environment);
- return sdkComponents;
- }
-
private SdkComponents(
@Nullable Components components,
@Nullable Collection<String> requirements,
- String newIdPrefix,
- @Nullable PipelineOptions pipelineOptions) {
+ String newIdPrefix) {
this.newIdPrefix = newIdPrefix;
this.requirements = new HashSet<>();
- this.pipelineOptions = pipelineOptions;
if (components == null) {
if (requirements != null) {
@@ -154,7 +153,7 @@ public class SdkComponents {
*/
public SdkComponents withNewIdPrefix(String newIdPrefix) {
SdkComponents sdkComponents =
- new SdkComponents(componentsBuilder.build(), requirements,
newIdPrefix, pipelineOptions);
+ new SdkComponents(componentsBuilder.build(), requirements,
newIdPrefix);
sdkComponents.transformIds.putAll(transformIds);
sdkComponents.pCollectionIds.putAll(pCollectionIds);
sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds);
@@ -175,7 +174,7 @@ public class SdkComponents {
throws IOException {
String name = getApplicationName(appliedPTransform);
// If this transform is present in the components, nothing to do. return
the existing name.
- // Otherwise, the transform must be translated and added to the components.
+ // Otherwise the transform must be translated and added to the components.
if (componentsBuilder.getTransformsOrDefault(name, null) != null) {
return name;
}
@@ -376,8 +375,4 @@ public class SdkComponents {
public Collection<String> requirements() {
return ImmutableSet.copyOf(requirements);
}
-
- public PipelineOptions getPipelineOptions() {
- return pipelineOptions;
- }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
index 1ec0a74f5be..b8f92ff0053 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
-import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -46,20 +45,14 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.SchemaCoder;
-import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.ShardedKey;
import
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -77,34 +70,6 @@ import org.junit.runners.Parameterized.Parameters;
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
public class CoderTranslationTest {
- @AutoValue
- @DefaultSchema(AutoValueSchema.class)
- public abstract static class SimpleAutoValue {
- public abstract String getString();
-
- public abstract int getInt32();
-
- public abstract long getInt64();
-
- public static SimpleAutoValue of(String string, Integer int32, Long int64)
{
- return new AutoValue_CoderTranslationTest_SimpleAutoValue(string, int32,
int64);
- }
- }
-
- private static final SchemaRegistry REGISTRY =
SchemaRegistry.createDefault();
-
- private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) {
- try {
- return SchemaCoder.of(
- REGISTRY.getSchema(typeDescriptor),
- typeDescriptor,
- REGISTRY.getToRowFunction(typeDescriptor),
- REGISTRY.getFromRowFunction(typeDescriptor));
- } catch (NoSuchSchemaException e) {
- throw new RuntimeException(e);
- }
- }
-
private static final Set<Coder<?>> KNOWN_CODERS =
ImmutableSet.<Coder<?>>builder()
.add(ByteArrayCoder.of())
@@ -129,7 +94,6 @@ public class CoderTranslationTest {
Field.of("array", FieldType.array(FieldType.STRING)),
Field.of("map", FieldType.map(FieldType.STRING,
FieldType.INT32)),
Field.of("bar",
FieldType.logicalType(FixedBytes.of(123))))))
- .add(schemaCoderFrom(TypeDescriptor.of(SimpleAutoValue.class)))
.add(ShardedKey.Coder.of(StringUtf8Coder.of()))
.add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of()))
.add(NullableCoder.of(ByteArrayCoder.of()))
@@ -163,7 +127,7 @@ public class CoderTranslationTest {
}
@Test
- public void validateModelCoderTranslators() {
+ public void validateCoderTranslators() {
assertThat(
"Every Model Coder must have a Translator",
new ModelCoderRegistrar().getCoderURNs().keySet(),
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index c93de201479..6ecb029c5d9 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -600,7 +600,7 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read");
} else {
LOG.warn(
- "Using use_deprecated_read in portable runners is runner-dependent.
The "
+ "Using use_depreacted_read in portable runners is runner-dependent.
The "
+ "ExpansionService will respect that, but if your runner does
not have support for "
+ "native Read transform, your Pipeline will fail during
Pipeline submission.");
}
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
index 8bd18fd8e25..14ab48f6669 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderRegistrar.java
@@ -21,11 +21,9 @@ import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.construction.CoderTranslator;
import org.apache.beam.sdk.util.construction.CoderTranslatorRegistrar;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.Nullable;
/** Coder registrar for AvroGenericCoder. */
@AutoService(CoderTranslatorRegistrar.class)
@@ -44,20 +42,4 @@ public class AvroGenericCoderRegistrar implements
CoderTranslatorRegistrar {
public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
getCoderTranslators() {
return ImmutableMap.of(AvroGenericCoder.class, new
AvroGenericCoderTranslator());
}
-
- @Override
- public boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
- return coder.getClass() == AvroGenericCoder.class;
- }
-
- @Override
- public @Nullable CoderTranslator<? extends Coder> getCoderTranslator(
- Class<? extends Coder> coderClass) {
- return coderClass == AvroGenericCoder.class ? new
AvroGenericCoderTranslator() : null;
- }
-
- @Override
- public @Nullable Class<? extends Coder> getCoderForUrn(String coderUrn) {
- return AVRO_CODER_URN.equals(coderUrn) ? AvroGenericCoder.class : null;
- }
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
index 42a6f8d11c2..ef8d69bc1ec 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
@@ -38,7 +38,6 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
@@ -53,7 +52,6 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -302,26 +300,6 @@ public class StateBackedIterable<T>
getCoderTranslators() {
return ImmutableMap.of(StateBackedIterable.Coder.class, new
Translator());
}
-
- @Override
- public boolean isKnownCoder(
- org.apache.beam.sdk.coders.Coder<?> coder, PipelineOptions options) {
- return coder.getClass() == StateBackedIterable.Coder.class;
- }
-
- @Override
- public @Nullable CoderTranslator<? extends
org.apache.beam.sdk.coders.Coder> getCoderTranslator(
- Class<? extends org.apache.beam.sdk.coders.Coder> coderClass) {
- return coderClass == StateBackedIterable.Coder.class ? new Translator()
: null;
- }
-
- @Override
- public @Nullable Class<? extends org.apache.beam.sdk.coders.Coder>
getCoderForUrn(
- String coderUrn) {
- return STATE_BACKED_ITERABLE_CODER_URN.equals(coderUrn)
- ? StateBackedIterable.Coder.class
- : null;
- }
}
/**