fapaul commented on a change in pull request #16701:
URL: https://github.com/apache/flink/pull/16701#discussion_r682543690
##########
File path:
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -161,6 +163,12 @@ private FileSink(BucketsBuilder<IN, ? extends
BucketsBuilder<IN, ?>> bucketsBuil
return Optional.empty();
}
+ @Override
+ public Collection<String> getCompatibleStateNames() {
Review comment:
Personally, I am not a big fan of the `Collection` abstraction. Is this
commonly favored in Flink over exposing some more information and using a
`List`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java
##########
@@ -33,44 +30,52 @@
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for
executing {@link
* Committer} in the batch execution mode.
*
- * @param <CommT> The committable type of the {@link Committer}.
+ * @param <InputT> The committable type of the {@link Committer}.
*/
-final class BatchCommitterOperator<CommT> extends AbstractStreamOperator<CommT>
- implements OneInputStreamOperator<CommT, CommT>, BoundedOneInput {
+final class BatchCommitterHandler<InputT, OutputT>
+ extends AbstractCommitterHandler<InputT, OutputT> {
/** Responsible for committing the committable to the external system. */
- private final Committer<CommT> committer;
+ private final Committer<InputT> committer;
- /** Record all the committables until the end of the input. */
- private final List<CommT> allCommittables;
+ /**
+ * The committer that is chained to this committer. It's either {@link
+ * GlobalBatchCommitterHandler} or {@link NoopCommitterHandler}.
Review comment:
Wouldn't is be easier to just not instantiate a `GlobalBatchCommitter`
if not necessary and not introduce the `NoopCommitterHandler`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperatorFactory.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import java.util.Optional;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory}
for {@link
+ * SinkOperator}.
+ *
+ * @param <InputT> The input type of the {@link SinkWriter}.
+ * @param <CommT> The committable type of the {@link SinkWriter}.
+ * @param <WriterStateT> The type of the {@link SinkWriter Writer's} state.
+ */
+public final class SinkOperatorFactory<InputT, CommT, WriterStateT>
Review comment:
I wonder if this class should be part of the previous commit IMO it
belongs to the newly introduced operator model.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable,
Serializable {
+
+ /** Initializes the state of the committer and this handler. */
+ default void initializeState(StateInitializationContext context) throws
Exception {}
+
+ /** Snapshots the state of the committer and this handler. */
+ default void snapshotState(StateSnapshotContext context) throws Exception
{}
+
+ /**
+ * Processes the committables be either directly transforming them or by
adding them to the
Review comment:
`by either directly ...`
FYI I seems Github suggestions are broken ...
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory}
for {@link
+ * CommitterOperator}.
+ *
+ * @param <CommT> the type of the committable
+ * @param <GlobalCommT> the type of the global committable
+ */
+public final class CommitterOperatorFactory<CommT, GlobalCommT>
+ extends AbstractStreamOperatorFactory<byte[]>
+ implements OneInputStreamOperatorFactory<byte[], byte[]> {
+
+ private final Sink<?, CommT, ?, GlobalCommT> sink;
+ private final boolean batch;
+
+ public CommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink,
boolean batch) {
+ this.sink = sink;
+ this.batch = batch;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<byte[]>> T createStreamOperator(
+ StreamOperatorParameters<byte[]> parameters) {
+
+ SimpleVersionedSerializer<CommT> committableSerializer =
+
sink.getCommittableSerializer().orElseThrow(this::noSerializerFound);
+ try {
+ CommitterHandler<CommT, GlobalCommT> committerHandler =
getGlobalCommitterHandler();
+ if (batch) {
+ Optional<Committer<CommT>> committer = sink.createCommitter();
+ if (committer.isPresent()) {
+ committerHandler =
+ new BatchCommitterHandler<>(committer.get(),
committerHandler);
+ }
+ }
+
+ checkState(
+ !(committerHandler instanceof NoopCommitterHandler),
+ "committer operator without commmitter");
+ final CommitterOperator<CommT, GlobalCommT> committerOperator =
+ new CommitterOperator<>(committableSerializer,
committerHandler, null);
Review comment:
I prefer having a separate constructor when fields are nullable.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java
##########
@@ -81,9 +83,17 @@
/** Return the serializer of the writer's state type. */
Optional<SimpleVersionedSerializer<WriterStateT>>
getWriterStateSerializer();
+ /**
+ * A list of state names of sinks from which the state can be restored.
For example, the new
+ * file sink can resume from the state of an old streaming file sink as a
drop-in replacement
+ * when resuming from a checkpoint/savepoint.
+ */
+ default Collection<String> getCompatibleStateNames() {
Review comment:
Nit: Do we need a FLIP for this change?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.function.BiFunctionWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An operator that processes records to be written into a {@link
+ * org.apache.flink.api.connector.sink.Sink}. It also has a way to process
committables with the
+ * same parallelism or send them downstream to a {@link CommitterOperator}
with a different
+ * parallelism.
+ *
+ * <p>The operator may be part of a sink pipeline and is the first operator.
There are currently two
+ * ways this operator is used:
+ *
+ * <ul>
+ * <li>In streaming mode, there is a this operator with parallelism p
containing {@link
Review comment:
Remove `a`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A wrapper around multiple {@link
org.apache.flink.api.connector.sink.Committer} or {@link
+ * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states.
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
+interface CommitterHandler<InputT, OutputT> extends AutoCloseable,
Serializable {
+
+ /** Initializes the state of the committer and this handler. */
+ default void initializeState(StateInitializationContext context) throws
Exception {}
+
+ /** Snapshots the state of the committer and this handler. */
+ default void snapshotState(StateSnapshotContext context) throws Exception
{}
+
+ /**
+ * Processes the committables be either directly transforming them or by
adding them to the
+ * internal state of this handler. The supplier should only be queried
once.
+ *
+ * @return a list of output committables that is send downstream.
+ */
+ List<OutputT> processCommittables(
+ SupplierWithException<List<InputT>, IOException>
committableSupplier)
+ throws IOException;
+
+ /**
+ * Called when no more committables are going to be added through {@link
+ * #processCommittables(SupplierWithException)}.
+ *
+ * @return a list of output committables that is send downstream.
+ */
+ default List<OutputT> endOfInput() throws IOException {
+ return Collections.emptyList();
+ }
+
+ default void close() throws Exception {}
+
+ /** Called when a checkpoint is completed and returns a list of output to
be sent downstream. */
+ default Collection<OutputT> notifyCheckpointCompleted(long checkpointId)
throws IOException {
+ return Collections.emptyList();
+ }
+
+ /** Swallows all committables and emits nothing. */
+ @SuppressWarnings("unchecked")
+ static <InputT, OutputT> CommitterHandler<InputT, OutputT> noop() {
Review comment:
Why is this method needed?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory}
for {@link
+ * CommitterOperator}.
+ *
+ * @param <CommT> the type of the committable
+ * @param <GlobalCommT> the type of the global committable
+ */
+public final class CommitterOperatorFactory<CommT, GlobalCommT>
+ extends AbstractStreamOperatorFactory<byte[]>
+ implements OneInputStreamOperatorFactory<byte[], byte[]> {
+
+ private final Sink<?, CommT, ?, GlobalCommT> sink;
+ private final boolean batch;
+
+ public CommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink,
boolean batch) {
+ this.sink = sink;
+ this.batch = batch;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<byte[]>> T createStreamOperator(
+ StreamOperatorParameters<byte[]> parameters) {
+
+ SimpleVersionedSerializer<CommT> committableSerializer =
+
sink.getCommittableSerializer().orElseThrow(this::noSerializerFound);
+ try {
+ CommitterHandler<CommT, GlobalCommT> committerHandler =
getGlobalCommitterHandler();
+ if (batch) {
+ Optional<Committer<CommT>> committer = sink.createCommitter();
+ if (committer.isPresent()) {
+ committerHandler =
+ new BatchCommitterHandler<>(committer.get(),
committerHandler);
+ }
+ }
+
+ checkState(
Review comment:
Isn't this always failing for batch mode because the batch mode set the
committerHandler to `BatchCommitterHandler`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java
##########
@@ -72,11 +58,12 @@ public void endInput() throws Exception {
}
}
globalCommitter.endOfInput();
+ return Collections.emptyList();
}
@Override
public void close() throws Exception {
- super.close();
globalCommitter.close();
+ super.close();
Review comment:
Why do we need this change? I am not fully familiar with the exact
semantics but first closing the object itself and next the `globalCommitter`
looks safer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]