kennknowles commented on code in PR #30332: URL: https://github.com/apache/beam/pull/30332#discussion_r1491897531
########## runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkAbstractAdapter.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.adapter; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.NativeTransforms; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; + +public abstract class BeamFlinkAbstractAdapter<DataSetOrStream> { + protected final PipelineOptions pipelineOptions; Review Comment: My take here is that this superclass is sensible standalone logic that is not in a mutually-recursive relationship with the subclass. So remove the subclassing and pass the `{ getTypeInformation; createContext }` thing in to the logic here. Kind of a stylistic point, but still, protected fields where the values are passed through the super constructor anti-pattern called attention to it. Plus, then the stricter separation between Dataset / DataStream parameterization will be better, and this primary logic won't have it bleed in. Unless there is a mutual recursion I missed... which also I would have missed because the control flow between subclass and superclass is hard to follow, hence good to eliminate. ########## runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkAbstractAdapter.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.adapter; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.NativeTransforms; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.util.construction.SdkComponents; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; + +public abstract class BeamFlinkAbstractAdapter<DataSetOrStream> { Review Comment: `T` suffix would be traditional to indicate it is a type variable (since java doesn't have lexicographic distinction) ########## runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.adapter; + +import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.construction.CoderTranslation; +import org.apache.beam.sdk.util.construction.RehydratedComponents; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class BeamAdapterUtils { + private BeamAdapterUtils() {} + + @SuppressWarnings("nullness") Review Comment: This much cyclomatic complexity and you want to suppress null checks? 😛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
