Sxnan commented on code in PR #24541: URL: https://github.com/apache/flink/pull/24541#discussion_r1607724484
########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultJobInfo.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.context; + +import org.apache.flink.datastream.api.context.JobInfo; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +/** Default implementation of {@link JobInfo}. */ +public class DefaultJobInfo implements JobInfo { + private final StreamingRuntimeContext operatorContext; + + public DefaultJobInfo(StreamingRuntimeContext streamingRuntimeContext) { + this.operatorContext = streamingRuntimeContext; Review Comment: I think it is better that we only pass the necessary info to the `DefaultJobInfo` like the `DefaultTaskInfo` ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java: ########## @@ -64,7 +68,26 @@ public JobInformation( Configuration jobConfiguration, Collection<PermanentBlobKey> requiredJarFileBlobKeys, Collection<URL> requiredClasspathURLs) { + this( + jobId, + JobType.STREAMING, + jobName, + serializedExecutionConfig, + jobConfiguration, + requiredJarFileBlobKeys, + requiredClasspathURLs); + } + + public JobInformation( Review Comment: I think the constructor without jobType is only used in tests. Instead of introducing a constructor, how about just updating the existing one? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java: ########## @@ -18,23 +18,61 @@ package org.apache.flink.datastream.impl.context; +import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.datastream.api.function.ApplyPartitionFunction; import org.apache.flink.metrics.MetricGroup; +import java.util.Set; + /** The default implementation of {@link NonPartitionedContext}. */ public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> { private final DefaultRuntimeContext context; - public DefaultNonPartitionedContext(DefaultRuntimeContext context) { + private final DefaultPartitionedContext partitionedContext; + + private final Collector<OUT> collector; + + private final boolean isKeyed; + + private final Set<Object> keySet; + + public DefaultNonPartitionedContext( + DefaultRuntimeContext context, + DefaultPartitionedContext partitionedContext, + Collector<OUT> collector, + boolean isKeyed, + Set<Object> keySet) { this.context = context; + this.partitionedContext = partitionedContext; + this.collector = collector; + this.isKeyed = isKeyed; + this.keySet = keySet; } @Override - public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) { - // TODO implements this method. + public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) + throws Exception { + if (isKeyed) { + for (Object key : keySet) { Review Comment: What happens to the keySet in the case of the operator restoring from a checkpoint? I think the method cannot properly work in keyed stream without statebackend. ########## flink-core-api/src/main/java/org/apache/flink/api/common/SlotSharingGroup.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.MemorySize; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Describe the name and the different resource components of a slot sharing group. + * + * <p>Two SlotSharingGroup classes currently exist in flink-core and flink-core-api, because the one + * in flink-core exposes components marked as internal, which we want to avoid in DataStream-V2 API. Review Comment: We should probably have UT to the SlotSharingGroup similar the legacy UT of SlotSharingGroup ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java: ########## @@ -52,7 +54,14 @@ public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) { @Override public void open() throws Exception { super.open(); - context = new DefaultRuntimeContext(getRuntimeContext()); + StreamingRuntimeContext operatorContext = getRuntimeContext(); + TaskInfo taskInfo = operatorContext.getTaskInfo(); + context = + new DefaultRuntimeContext( + operatorContext, Review Comment: It feels inconsistent that the OperatorContext and the internal of taskInfo are given to the `DefaultRuntimeContext` at the same time. I think we either pass the operatorContext that contains all the info or we just pass the required info from the operator context. Same for the other operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
