This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0757e95c3c33d4f13377f2c14c179076b3795b2 Author: Yun Gao <[email protected]> AuthorDate: Wed Nov 25 23:29:38 2020 +0800 [FLINK-20337] Let FileSink restore state from StreamingFileSink The FileWriterBucketStateSerializer is bumped past version 2 to allow it to restore the same old versions as BucketStateSerialize (from StreamingFileSink). --- .../apache/flink/connector/file/sink/FileSink.java | 14 +-- .../file/sink/writer/FileWriterBucket.java | 12 +++ .../file/sink/writer/FileWriterBucketState.java | 33 +++++++ .../writer/FileWriterBucketStateSerializer.java | 110 ++++++++++++++++++--- .../FileWriterBucketStateSerializerTest.java | 4 +- .../file/sink/writer/FileWriterBucketTest.java | 59 +++++++++++ .../OutputStreamBasedPartFileWriter.java | 23 +++-- 7 files changed, 229 insertions(+), 26 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java index 13d36d3..ccd010d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java @@ -293,10 +293,11 @@ public class FileSink<IN> @Override SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() throws IOException { + BucketWriter<IN, String> bucketWriter = createBucketWriter(); + return new FileWriterBucketStateSerializer( - createBucketWriter() - .getProperties() - .getInProgressFileRecoverableSerializer()); + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketWriter.getProperties().getPendingFileRecoverableSerializer()); } @Override @@ -444,10 +445,11 @@ public class FileSink<IN> @Override SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() throws IOException { + BucketWriter<IN, String> bucketWriter = createBucketWriter(); + return new FileWriterBucketStateSerializer( - createBucketWriter() - .getProperties() - .getInProgressFileRecoverableSerializer()); + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketWriter.getProperties().getPendingFileRecoverableSerializer()); } @Override diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java index 33a4469..5cb135a 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java @@ -118,6 +118,10 @@ class FileWriterBucket<IN> { outputFileConfig); restoreInProgressFile(bucketState); + + // Restore pending files, this only make difference if we are + // migrating from {@code StreamingFileSink}. + cacheRecoveredPendingFiles(bucketState); } private void restoreInProgressFile(FileWriterBucketState state) throws IOException { @@ -137,6 +141,14 @@ class FileWriterBucket<IN> { } } + private void cacheRecoveredPendingFiles(FileWriterBucketState state) { + // Cache the previous pending files and send to committer on the first prepareCommit operation. + for (List<InProgressFileWriter.PendingFileRecoverable> restoredPendingRecoverables : + state.getPendingFileRecoverablesPerCheckpoint().values()) { + pendingFiles.addAll(restoredPendingRecoverables); + } + } + public String getBucketId() { return bucketId; } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java index 72f88c1..cbab95e 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java @@ -23,7 +23,12 @@ import org.apache.flink.core.fs.Path; import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable; +import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; /** * States for {@link FileWriterBucket}. @@ -49,15 +54,31 @@ public class FileWriterBucketState { @Nullable private final InProgressFileRecoverable inProgressFileRecoverable; + /** + * The {@link PendingFileRecoverable} should be empty unless we + * are migrating from {@code StreamingFileSink}. + */ + private final Map<Long, List<PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint; + public FileWriterBucketState( String bucketId, Path bucketPath, long inProgressFileCreationTime, @Nullable InProgressFileRecoverable inProgressFileRecoverable) { + this(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, new HashMap<>()); + } + + public FileWriterBucketState( + String bucketId, + Path bucketPath, + long inProgressFileCreationTime, + @Nullable InProgressFileRecoverable inProgressFileRecoverable, + Map<Long, List<PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint) { this.bucketId = bucketId; this.bucketPath = bucketPath; this.inProgressFileCreationTime = inProgressFileCreationTime; this.inProgressFileRecoverable = inProgressFileRecoverable; + this.pendingFileRecoverablesPerCheckpoint = pendingFileRecoverablesPerCheckpoint; } public String getBucketId() { @@ -81,6 +102,10 @@ public class FileWriterBucketState { return inProgressFileRecoverable != null; } + public Map<Long, List<PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() { + return pendingFileRecoverablesPerCheckpoint; + } + @Override public String toString() { final StringBuilder strBuilder = new StringBuilder(); @@ -92,6 +117,14 @@ public class FileWriterBucketState { if (hasInProgressFileRecoverable()) { strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime); } + + if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) { + strBuilder.append(", has pending files for checkpoints: {"); + for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) { + strBuilder.append(checkpointId).append(' '); + } + strBuilder.append('}'); + } return strBuilder.toString(); } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java index a40cddb9..659ee0d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java @@ -20,17 +20,28 @@ package org.apache.flink.connector.file.sink.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.util.function.BiFunctionWithException; +import org.apache.flink.util.function.FunctionWithException; + +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable; +import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -44,22 +55,25 @@ public class FileWriterBucketStateSerializer private final SimpleVersionedSerializer<InProgressFileRecoverable> inProgressFileRecoverableSerializer; + private final SimpleVersionedSerializer<PendingFileRecoverable> pendingFileRecoverableSerializer; + public FileWriterBucketStateSerializer( - SimpleVersionedSerializer<InProgressFileRecoverable> inProgressFileRecoverableSerializer) { - this.inProgressFileRecoverableSerializer = checkNotNull( - inProgressFileRecoverableSerializer); + SimpleVersionedSerializer<InProgressFileRecoverable> inProgressFileRecoverableSerializer, + SimpleVersionedSerializer<PendingFileRecoverable> pendingFileRecoverableSerializer) { + this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer); + this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer); } @Override public int getVersion() { - return 1; + return 3; } @Override public byte[] serialize(FileWriterBucketState state) throws IOException { DataOutputSerializer out = new DataOutputSerializer(256); out.writeInt(MAGIC_NUMBER); - serializeV1(state, out); + serializeV3(state, out); return out.getCopyOfBuffer(); } @@ -73,12 +87,18 @@ public class FileWriterBucketStateSerializer case 1: validateMagicNumber(in); return deserializeV1(in); + case 2: + validateMagicNumber(in); + return deserializeV2(in); + case 3: + validateMagicNumber(in); + return deserializeV3(in); default: throw new IOException("Unrecognized version or corrupt state: " + version); } } - private void serializeV1( + private void serializeV3( FileWriterBucketState state, DataOutputView dataOutputView) throws IOException { SimpleVersionedSerialization.writeVersionAndSerialize( @@ -102,7 +122,41 @@ public class FileWriterBucketStateSerializer } } - private FileWriterBucketState deserializeV1(DataInputView dataInputView) throws IOException { + private FileWriterBucketState deserializeV1(DataInputView in) throws IOException { + final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer(); + final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer(); + + return internalDeserialize( + in, + dataInputView -> new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable( + SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, dataInputView)), + (version, bytes) -> new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable( + commitableSerializer.deserialize(version, bytes))); + } + + private FileWriterBucketState deserializeV2(DataInputView in) throws IOException { + return internalDeserialize( + in, + dataInputView -> SimpleVersionedSerialization.readVersionAndDeSerialize( + inProgressFileRecoverableSerializer, + dataInputView), + pendingFileRecoverableSerializer::deserialize); + } + + private FileWriterBucketState deserializeV3(DataInputView in) throws IOException { + return internalDeserialize( + in, + dataInputView -> SimpleVersionedSerialization.readVersionAndDeSerialize( + inProgressFileRecoverableSerializer, + dataInputView), + null); + } + + private FileWriterBucketState internalDeserialize( + DataInputView dataInputView, + FunctionWithException<DataInputView, InProgressFileRecoverable, IOException> inProgressFileParser, + @Nullable BiFunctionWithException<Integer, byte[], PendingFileRecoverable, IOException> pendingFileParser) throws IOException { + String bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize( SimpleVersionedStringSerializer.INSTANCE, dataInputView); @@ -112,19 +166,37 @@ public class FileWriterBucketStateSerializer // then get the current resumable stream InProgressFileRecoverable current = null; if (dataInputView.readBoolean()) { - current = SimpleVersionedSerialization.readVersionAndDeSerialize( - inProgressFileRecoverableSerializer, - dataInputView); + current = inProgressFileParser.apply(dataInputView); + } + + HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(); + if (pendingFileParser != null) { + final int pendingFileRecoverableSerializerVersion = dataInputView.readInt(); + final int numCheckpoints = dataInputView.readInt(); + + for (int i = 0; i < numCheckpoints; i++) { + final long checkpointId = dataInputView.readLong(); + final int numOfPendingFileRecoverables = dataInputView.readInt(); + + final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables); + for (int j = 0; j < numOfPendingFileRecoverables; j++) { + final byte[] bytes = new byte[dataInputView.readInt()]; + dataInputView.readFully(bytes); + pendingFileRecoverables.add(pendingFileParser.apply(pendingFileRecoverableSerializerVersion, bytes)); + } + pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables); + } } return new FileWriterBucketState( bucketId, new Path(bucketPathStr), creationTime, - current); + current, + pendingFileRecoverablesPerCheckpoint); } - private static void validateMagicNumber(DataInputView in) throws IOException { + private void validateMagicNumber(DataInputView in) throws IOException { int magicNumber = in.readInt(); if (magicNumber != MAGIC_NUMBER) { throw new IOException(String.format( @@ -132,4 +204,18 @@ public class FileWriterBucketStateSerializer magicNumber)); } } + + private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() { + final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer + outputStreamBasedInProgressFileRecoverableSerializer = + (OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer; + return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer(); + } + + private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() { + final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer + outputStreamBasedPendingFileRecoverableSerializer = + (OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer; + return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer(); + } } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java index 445d00b6..d173434 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java @@ -70,7 +70,9 @@ public class FileWriterBucketStateSerializerTest { private FileWriterBucketState serializeAndDeserialize(FileWriterBucketState bucketState) throws IOException { FileWriterBucketStateSerializer serializer = new FileWriterBucketStateSerializer( new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>( - FileSinkTestUtils.TestInProgressFileRecoverable::new)); + FileSinkTestUtils.TestInProgressFileRecoverable::new), + new FileSinkTestUtils.SimpleVersionedWrapperSerializer<>( + FileSinkTestUtils.TestPendingFileRecoverable::new)); byte[] data = serializer.serialize(bucketState); return serializer.deserialize(serializer.getVersion(), data); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java index 4d3f805..ec91dd7 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.fs.local.LocalRecoverableWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; @@ -45,7 +46,10 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertEquals; @@ -254,6 +258,28 @@ public class FileWriterBucketTest { bucketState.getInProgressFileRecoverable()); } + /** + * Tests restoring with state containing pending files. This might happen + * if we are migrating from {@code StreamingFileSink}. + */ + @Test + public void testRestoringWithOnlyPendingFiles() throws IOException { + final int noOfPendingFileCheckpoints = 4; + + StubResumableWriter resumableWriter = new StubResumableWriter(); + FileWriterBucket<String> bucket = getRestoredBucketWithOnlyPendingFiles( + resumableWriter, + DEFAULT_ROLLING_POLICY, + noOfPendingFileCheckpoints); + assertNull("There should be no in-progress file", bucket.getInProgressPart()); + // There is one pending file for each checkpoint + assertEquals(noOfPendingFileCheckpoints, bucket.getPendingFiles().size()); + + List<FileSinkCommittable> fileSinkCommittables = bucket.prepareCommit(false); + bucket.snapshotState(); + compareNumberOfPendingAndInProgress(fileSinkCommittables, noOfPendingFileCheckpoints, 0); + } + @Test public void testMergeWithInprogressFileNotSupportResume() throws IOException { FileWriterBucket<String> bucket1 = getRestoredBucketWithOnlyInProgressPart( @@ -487,4 +513,37 @@ public class FileWriterBucketTest { stateWithOnlyInProgressFile, OutputFileConfig.builder().build()); } + + private FileWriterBucket<String> getRestoredBucketWithOnlyPendingFiles( + BaseStubWriter writer, + RollingPolicy<String, String> rollingPolicy, + int numberOfPendingParts) throws IOException { + + Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint = + createPendingPartsPerCheckpoint(numberOfPendingParts); + + FileWriterBucketState state = + new FileWriterBucketState( + "test", + new Path("file:///fake/fakefile"), + 12345L, + null, + completePartsPerCheckpoint); + + return FileWriterBucket.restore( + new RowWiseBucketWriter<>(writer, ENCODER), + rollingPolicy, + state, + OutputFileConfig.builder().build()); + } + + private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) { + final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>(); + for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) { + final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>(); + pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable())); + pendingCommittablesPerCheckpoint.put((long) checkpointId, pending); + } + return pendingCommittablesPerCheckpoint; + } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java index 64c2bae..715bf33 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java @@ -135,11 +135,14 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst final long creationTime) throws IOException; } - static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable { + /** + * The {@link PendingFileRecoverable} implementation for {@link OutputStreamBasedBucketWriter}. + */ + public static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable { private final RecoverableWriter.CommitRecoverable commitRecoverable; - OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) { + public OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) { this.commitRecoverable = commitRecoverable; } @@ -149,7 +152,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst } /** - * A kind of {@link InProgressFileRecoverable} implementation. + * The {@link InProgressFileRecoverable} implementation for {@link OutputStreamBasedBucketWriter}. */ public static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable { @@ -183,7 +186,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst } } - static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> { + /** + * The serializer for {@link OutputStreamBasedInProgressFileRecoverable}. + */ + public static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> { private static final int MAGIC_NUMBER = 0xb3a4073d; @@ -219,7 +225,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst } } - SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() { + public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() { return resumeSerializer; } @@ -239,7 +245,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst } } - static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> { + /** + * The serializer for {@link OutputStreamBasedPendingFileRecoverable}. + */ + public static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> { private static final int MAGIC_NUMBER = 0x2c853c89; @@ -276,7 +285,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends Abst } } - SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() { + public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() { return this.commitSerializer; }
