[
https://issues.apache.org/jira/browse/FLINK-10522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710107#comment-16710107
]
ASF GitHub Bot commented on FLINK-10522:
----------------------------------------
asfgit closed pull request #7047: [FLINK-10522] Check if RecoverableWriter
supportsResume() and act accordingly.
URL: https://github.com/apache/flink/pull/7047
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628578c..82d664be868 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -124,8 +124,17 @@ private void restoreInProgressFile(final
BucketState<BucketID> state) throws IOE
// we try to resume the previous in-progress file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable =
state.getInProgressResumableFile();
- inProgressPart = partFileFactory.resumeFrom(
- bucketId, fsWriter, resumable,
state.getInProgressFileCreationTime());
+
+ if (fsWriter.supportsResume()) {
+ inProgressPart = partFileFactory.resumeFrom(
+ bucketId, fsWriter, resumable,
state.getInProgressFileCreationTime());
+ } else {
+
+ // if the writer does not support resume, then
we close the
+ // in-progress part and commit it, as done in
the case of pending files.
+
+
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
+ }
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
new file mode 100644
index 00000000000..984ba404bf7
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tests for the {@link Bucket} class.
+ */
+public class BucketTest {
+
+ private final PartFileWriter.PartFileFactory<String, Integer> factory =
+ new RowWisePartWriter.Factory<>(new
SimpleStringEncoder<>());
+
+ private final RollingPolicy<String, Integer> rollingPolicy =
DefaultRollingPolicy.create().build();
+
+ // --------------------------- Checking Restore
---------------------------
+
+ @Test
+ public void
inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws
IOException {
+ final StubNonResumableWriter nonResumableWriter = new
StubNonResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
+
+ Assert.assertThat(nonResumableWriter,
hasMethodCallCountersEqualTo(1, 0, 1));
+ Assert.assertThat(bucket, hasNullInProgressFile(true));
+ }
+
+ @Test
+ public void inProgressFileShouldBeRestoredIfWriterSupportsResume()
throws IOException {
+ final StubResumableWriter resumableWriter = new
StubResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyInProgressPart(resumableWriter);
+
+ Assert.assertThat(resumableWriter,
hasMethodCallCountersEqualTo(1, 1, 0));
+ Assert.assertThat(bucket, hasNullInProgressFile(false));
+ }
+
+ @Test
+ public void pendingFilesShouldBeRestored() throws IOException {
+ final int expectedRecoverForCommitCounter = 10;
+
+ final StubNonResumableWriter writer = new
StubNonResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyPendingParts(writer, expectedRecoverForCommitCounter);
+
+ Assert.assertThat(writer, hasMethodCallCountersEqualTo(0, 0,
expectedRecoverForCommitCounter));
+ Assert.assertThat(bucket, hasNullInProgressFile(true));
+ }
+
+ // ---------------------------------- Matchers
----------------------------------
+
+ private static TypeSafeMatcher<Bucket<String, Integer>>
hasNullInProgressFile(final boolean isNull) {
+
+ return new TypeSafeMatcher<Bucket<String, Integer>>() {
+ @Override
+ protected boolean matchesSafely(Bucket<String, Integer>
bucket) {
+ final PartFileWriter<String, Integer>
inProgressPart = bucket.getInProgressPart();
+ return isNull == (inProgressPart == null);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Bucket with its
inProgressPart being ")
+ .appendText(isNull ? " null." :
" not null.");
+ }
+ };
+ }
+
+ private static TypeSafeMatcher<BaseStubWriter>
hasMethodCallCountersEqualTo(
+ final int supportsResumeCalls,
+ final int recoverCalls,
+ final int recoverForCommitCalls) {
+
+ return new TypeSafeMatcher<BaseStubWriter>() {
+ @Override
+ protected boolean matchesSafely(BaseStubWriter writer) {
+ return writer.getSupportsResumeCallCounter() ==
supportsResumeCalls &&
+ writer.getRecoverCallCounter()
== recoverCalls &&
+
writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Writer where:")
+ .appendText(" supportsResume
was called ").appendValue(supportsResumeCalls).appendText(" times,")
+ .appendText(" recover was
called ").appendValue(recoverCalls).appendText(" times,")
+ .appendText(" and
recoverForCommit was called ").appendValue(recoverForCommitCalls).appendText("
times.")
+ .appendText("'");
+ }
+ };
+ }
+
+ // ---------------------------------- Utility Methods
----------------------------------
+
+ private Bucket<String, Integer>
getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws
IOException {
+ final BucketState<Integer> stateWithOnlyInProgressFile =
+ new BucketState<>(5, new Path(), 12345L, new
NoOpRecoverable(), new HashMap<>());
+ return Bucket.restore(writer, 0, 1L, factory, rollingPolicy,
stateWithOnlyInProgressFile);
+ }
+
+ private Bucket<String, Integer>
getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int
numberOfPendingParts) throws IOException {
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
completePartsPerCheckpoint =
+
createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+ final BucketState<Integer> initStateWithOnlyInProgressFile =
+ new BucketState<>(5, new Path(), 12345L, null,
completePartsPerCheckpoint);
+ return Bucket.restore(writer, 0, 1L, factory, rollingPolicy,
initStateWithOnlyInProgressFile);
+ }
+
+ private Map<Long, List<RecoverableWriter.CommitRecoverable>>
createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingCommittablesPerCheckpoint = new HashMap<>();
+ for (int checkpointId = 0; checkpointId < noOfCheckpoints;
checkpointId++) {
+ final List<RecoverableWriter.CommitRecoverable> pending
= new ArrayList<>();
+ pending.add(new NoOpRecoverable());
+ pendingCommittablesPerCheckpoint.put((long)
checkpointId, pending);
+ }
+ return pendingCommittablesPerCheckpoint;
+ }
+
+ // ---------------------------------- Test Classes
----------------------------------
+
+ /**
+ * A test implementation of a {@link RecoverableWriter} that does not
support
+ * resuming, i.e. keep on writing to the in-progress file at the point
we were
+ * before the failure.
+ */
+ private static class StubResumableWriter extends BaseStubWriter {
+
+ StubResumableWriter() {
+ super(true);
+ }
+ }
+
+ /**
+ * A test implementation of a {@link RecoverableWriter} that does not
support
+ * resuming, i.e. keep on writing to the in-progress file at the point
we were
+ * before the failure.
+ */
+ private static class StubNonResumableWriter extends BaseStubWriter {
+
+ StubNonResumableWriter() {
+ super(false);
+ }
+ }
+
+ /**
+ * A test implementation of a {@link RecoverableWriter} that does not
support
+ * resuming, i.e. keep on writing to the in-progress file at the point
we were
+ * before the failure.
+ */
+ private static class BaseStubWriter extends NoOpRecoverableWriter {
+
+ private final boolean supportsResume;
+
+ private int supportsResumeCallCounter = 0;
+
+ private int recoverCallCounter = 0;
+
+ private int recoverForCommitCallCounter = 0;
+
+ private BaseStubWriter(final boolean supportsResume) {
+ this.supportsResume = supportsResume;
+ }
+
+ int getSupportsResumeCallCounter() {
+ return supportsResumeCallCounter;
+ }
+
+ int getRecoverCallCounter() {
+ return recoverCallCounter;
+ }
+
+ int getRecoverForCommitCallCounter() {
+ return recoverForCommitCallCounter;
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream recover(ResumeRecoverable
resumable) throws IOException {
+ recoverCallCounter++;
+ return new NoOpRecoverableFsDataOutputStream();
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream.Committer
recoverForCommit(CommitRecoverable resumable) throws IOException {
+ checkArgument(resumable instanceof NoOpRecoverable);
+ recoverForCommitCallCounter++;
+ return new NoOpCommitter();
+ }
+
+ @Override
+ public boolean supportsResume() {
+ supportsResumeCallCounter++;
+ return supportsResume;
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java
new file mode 100644
index 00000000000..06005a15a48
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream.Committer
committer}
+ * that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every
implementation
+ * used in tests.
+ */
+public class NoOpCommitter implements RecoverableFsDataOutputStream.Committer {
+
+ @Override
+ public void commit() throws IOException {
+
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+
+ }
+
+ @Override
+ public RecoverableWriter.CommitRecoverable getRecoverable() {
+ return null;
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java
new file mode 100644
index 00000000000..e00d4c4b0a9
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+/**
+ * An implementation of the {@link RecoverableWriter.ResumeRecoverable
ResumeRecoverable}
+ * that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every
implementation
+ * used in tests.
+ */
+public class NoOpRecoverable implements RecoverableWriter.ResumeRecoverable {
+
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java
new file mode 100644
index 00000000000..a549896922d
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * A default implementation of the {@link RecoverableFsDataOutputStream} that
does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every
implementation
+ * used in tests.
+ */
+public class NoOpRecoverableFsDataOutputStream extends
RecoverableFsDataOutputStream {
+ @Override
+ public RecoverableWriter.ResumeRecoverable persist() throws IOException
{
+ return null;
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public void sync() throws IOException {
+
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
new file mode 100644
index 00000000000..e6994341966
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A default implementation of the {@link RecoverableWriter} that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every
implementation
+ * used in tests.
+ */
+public class NoOpRecoverableWriter implements RecoverableWriter {
+
+ @Override
+ public RecoverableFsDataOutputStream open(Path path) throws IOException
{
+ return null;
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream
recover(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
+ return null;
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream.Committer
recoverForCommit(RecoverableWriter.CommitRecoverable resumable) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable>
getCommitRecoverableSerializer() {
+ return null;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable>
getResumeRecoverableSerializer() {
+ return null;
+ }
+
+ @Override
+ public boolean supportsResume() {
+ return false;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Check if RecoverableWriter supportsResume and act accordingly.
> --------------------------------------------------------------
>
> Key: FLINK-10522
> URL: https://issues.apache.org/jira/browse/FLINK-10522
> Project: Flink
> Issue Type: Sub-task
> Components: filesystem-connector
> Affects Versions: 1.6.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> So far we assumed that all `RecoverableWriters` support "resuming", i.e.
> after recovering from a failure or from a savepoint they could keep writing
> to the previously "in-progress" file. This assumption holds for all current
> writers, but in order to be able to accommodate also filesystems that may not
> support this operation, we should check upon initialization if the writer
> supports resuming and if yes, we go as before, if not, we recover for commit
> and commit the previously in-progress file.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)