[
https://issues.apache.org/jira/browse/FLINK-10522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708556#comment-16708556
]
ASF GitHub Bot commented on FLINK-10522:
----------------------------------------
azagrebin commented on a change in pull request #7047: [FLINK-10522] Check if
RecoverableWriter supportsResume() and act accordingly.
URL: https://github.com/apache/flink/pull/7047#discussion_r238593584
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tests for the {@link Bucket} class.
+ */
+public class BucketTest {
+
+ private final PartFileWriter.PartFileFactory<String, Integer> factory =
+ new RowWisePartWriter.Factory<>(new
SimpleStringEncoder<>());
+
+ private final RollingPolicy<String, Integer> rollingPolicy =
DefaultRollingPolicy.create().build();
+
+ // --------------------------- Checking Restore
---------------------------
+
+ @Test
+ public void
inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws
IOException {
+ final StubNonResumableWriter nonResumableWriter = new
StubNonResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
+
+ Assert.assertThat(nonResumableWriter,
hasMethodCallCountersEqualTo(1, 0, 1));
+ Assert.assertThat(bucket, hasNullInProgressFile(true));
+ }
+
+ @Test
+ public void inProgressFileShouldBeRestoredIfWriterSupportsResume()
throws IOException {
+ final StubResumableWriter resumableWriter = new
StubResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyInProgressPart(resumableWriter);
+
+ Assert.assertThat(resumableWriter,
hasMethodCallCountersEqualTo(1, 1, 0));
+ Assert.assertThat(bucket, hasNullInProgressFile(false));
+ }
+
+ @Test
+ public void pendingFilesShouldBeRestored() throws IOException {
+ final int expectedRecoverForCommitCounter = 10;
+
+ final StubNonResumableWriter writer = new
StubNonResumableWriter();
+ final Bucket<String, Integer> bucket =
getRestoredBucketWithOnlyPendingParts(writer, expectedRecoverForCommitCounter);
+
+ Assert.assertThat(writer, hasMethodCallCountersEqualTo(0, 0,
expectedRecoverForCommitCounter));
+ Assert.assertThat(bucket, hasNullInProgressFile(true));
+ }
+
+ // ---------------------------------- Matchers
----------------------------------
+
+ private static TypeSafeMatcher<Bucket<String, Integer>>
hasNullInProgressFile(final boolean isNull) {
+
+ return new TypeSafeMatcher<Bucket<String, Integer>>() {
+ @Override
+ protected boolean matchesSafely(Bucket<String, Integer>
bucket) {
+ final PartFileWriter<String, Integer>
inProgressPart = bucket.getInProgressPart();
+ return isNull == (inProgressPart == null);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Bucket with its
inProgressPart being ")
+ .appendText(isNull ? " null." :
" not null.");
+ }
+ };
+ }
+
+ private static TypeSafeMatcher<BaseStubWriter>
hasMethodCallCountersEqualTo(
+ final int supportsResumeCalls,
+ final int recoverCalls,
+ final int recoverForCommitCalls) {
+
+ return new TypeSafeMatcher<BaseStubWriter>() {
+ @Override
+ protected boolean matchesSafely(BaseStubWriter writer) {
+ return writer.getSupportsResumeCallCounter() ==
supportsResumeCalls &&
+ writer.getRecoverCallCounter()
== recoverCalls &&
+
writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Writer where:")
+ .appendText(" supportsResume
was called ").appendValue(supportsResumeCalls).appendText(" times,")
+ .appendText(" recover was
called ").appendValue(recoverCalls).appendText(" times,")
+ .appendText(" and
recoverForCommit was called ").appendValue(recoverForCommitCalls).appendText("
times.")
+ .appendText("'");
+ }
+ };
+ }
+
+ // ---------------------------------- Utility Methods
----------------------------------
+
+ private Bucket<String, Integer>
getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws
IOException {
+ final BucketState<Integer> stateWithOnlyInProgressFile =
+ new BucketState<>(5, new Path(), 12345L, new
NoOpRecoverable(), new HashMap<>());
+ return Bucket.restore(writer, 0, 1L, factory, rollingPolicy,
stateWithOnlyInProgressFile);
+ }
+
+ private Bucket<String, Integer>
getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int
numberOfPendingParts) throws IOException {
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
completePartsPerCheckpoint =
+
createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+ final BucketState<Integer> initStateWithOnlyInProgressFile =
+ new BucketState<>(5, new Path(), 12345L, null,
completePartsPerCheckpoint);
+ return Bucket.restore(writer, 0, 1L, factory, rollingPolicy,
initStateWithOnlyInProgressFile);
+ }
+
+ private Map<Long, List<RecoverableWriter.CommitRecoverable>>
createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>>
pendingCommittablesPerCheckpoint = new HashMap<>();
+ for (int checkpointId = 0; checkpointId < noOfCheckpoints;
checkpointId++) {
+ final List<RecoverableWriter.CommitRecoverable> pending
= new ArrayList<>();
+ pending.add(new NoOpRecoverable());
+ pendingCommittablesPerCheckpoint.put((long)
checkpointId, pending);
+ }
+ return pendingCommittablesPerCheckpoint;
+ }
+
+ // ---------------------------------- Test Classes
----------------------------------
+
+ /**
+ * A test implementation of a {@link RecoverableWriter} that does not
support
+ * resuming, i.e. keep on writing to the in-progress file at the point
we were
+ * before the failure.
Review comment:
Comments for 3 stub classes look like they need to be updated accordingly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Check if RecoverableWriter supportsResume and act accordingly.
> --------------------------------------------------------------
>
> Key: FLINK-10522
> URL: https://issues.apache.org/jira/browse/FLINK-10522
> Project: Flink
> Issue Type: Sub-task
> Components: filesystem-connector
> Affects Versions: 1.6.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0
>
>
> So far we assumed that all `RecoverableWriters` support "resuming", i.e.
> after recovering from a failure or from a savepoint they could keep writing
> to the previously "in-progress" file. This assumption holds for all current
> writers, but in order to be able to accommodate also filesystems that may not
> support this operation, we should check upon initialization if the writer
> supports resuming and if yes, we go as before, if not, we recover for commit
> and commit the previously in-progress file.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)