This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7abdb0e [FLINK-26717][fs][core] Move s3 common utils to flink-core
7abdb0e is described below
commit 7abdb0e5a8cd5b28e56d0bae5095197b95be819f
Author: Jinhu Wu <[email protected]>
AuthorDate: Mon Mar 21 10:24:16 2022 +0800
[FLINK-26717][fs][core] Move s3 common utils to flink-core
This closes #19141
---
.../main/java/org/apache/flink/core/fs}/BackPressuringExecutor.java | 2 +-
.../java/org/apache/flink/core/fs}/OffsetAwareOutputStream.java | 2 +-
.../org/apache/flink/core/fs}/RefCountedBufferingFileStream.java | 4 ++--
.../java/org/apache/flink/core/fs}/RefCountedFSOutputStream.java | 3 +--
.../src/main/java/org/apache/flink/core/fs/RefCountedFile.java | 2 --
.../java/org/apache/flink/core/fs}/RefCountedFileWithStream.java | 3 +--
.../java/org/apache/flink/core/fs}/RefCountedTmpFileCreator.java | 2 +-
.../apache/flink/core/fs}/RefCountedBufferingFileStreamTest.java | 2 +-
.../org/apache/flink/core/fs}/RefCountedFileWithStreamTest.java | 2 +-
.../main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java | 4 ++--
.../flink/fs/s3/common/writer/RecoverableMultiPartUpload.java | 2 +-
.../flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java | 2 +-
.../flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java | 6 +++---
.../fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java | 4 ++--
.../org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java | 2 +-
.../fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java | 4 ++--
.../fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java | 6 +++---
17 files changed, 24 insertions(+), 28 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java
b/flink-core/src/main/java/org/apache/flink/core/fs/BackPressuringExecutor.java
similarity index 98%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/BackPressuringExecutor.java
index c58212a..8cc8316 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/BackPressuringExecutor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
b/flink-core/src/main/java/org/apache/flink/core/fs/OffsetAwareOutputStream.java
similarity index 97%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/OffsetAwareOutputStream.java
index aa8ef9b..3ee4b76 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/OffsetAwareOutputStream.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedBufferingFileStream.java
similarity index 97%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/RefCountedBufferingFileStream.java
index c1e257e..dbf1b97 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedBufferingFileStream.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -109,7 +109,7 @@ public class RefCountedBufferingFileStream extends
RefCountedFSOutputStream {
@Override
public void sync() throws IOException {
throw new UnsupportedOperationException(
- "S3RecoverableFsDataOutputStream cannot sync state to S3. "
+ "Cannot sync state to system like S3. "
+ "Use persist() to create a persistent recoverable
intermediate point.");
}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFSOutputStream.java
similarity index 94%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFSOutputStream.java
index 73b57fc..1f65537 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFSOutputStream.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.util.RefCounted;
import java.io.File;
diff --git
a/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
index 08c8d48..a819d96 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFile.java
@@ -19,7 +19,6 @@
package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.RefCounted;
@@ -82,7 +81,6 @@ public class RefCountedFile implements RefCounted {
}
}
- @VisibleForTesting
public int getReferenceCounter() {
return references.get();
}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFileWithStream.java
similarity index 96%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFileWithStream.java
index 7589261..360e7ea 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStream.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedFileWithStream.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.util.IOUtils;
import java.io.File;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedTmpFileCreator.java
similarity index 98%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
rename to
flink-core/src/main/java/org/apache/flink/core/fs/RefCountedTmpFileCreator.java
index 1330557..173a86f 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedTmpFileCreator.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/RefCountedTmpFileCreator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.function.FunctionWithException;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedBufferingFileStreamTest.java
similarity index 99%
rename from
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
rename to
flink-core/src/test/java/org/apache/flink/core/fs/RefCountedBufferingFileStreamTest.java
index d88bc76..0b21ce2 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
+++
b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedBufferingFileStreamTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.junit.Assert;
import org.junit.Rule;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileWithStreamTest.java
similarity index 98%
rename from
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
rename to
flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileWithStreamTest.java
index 3e9955f..1cfce94 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileWithStreamTest.java
+++
b/flink-core/src/test/java/org/apache/flink/core/fs/RefCountedFileWithStreamTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.fs.s3.common.utils;
+package org.apache.flink.core.fs;
import org.junit.Assert;
import org.junit.Rule;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 8339fbc..6d1b4d8 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -21,8 +21,8 @@ package org.apache.flink.fs.s3.common;
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedTmpFileCreator;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
index 6676a80..f6381f9 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUpload.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
import javax.annotation.Nullable;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 59162e2..ddc6ceb 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -20,7 +20,7 @@ package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartResult;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
index c7ed934..8cce4ce 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStream.java
@@ -21,9 +21,9 @@ package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.commons.io.IOUtils;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index de7f678..368576b 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,9 +19,9 @@
package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.BackPressuringExecutor;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a5216e..352cecd 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.hadoop.fs.FileSystem;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index e7fcf98..8911de5 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.fs.s3.common.writer;
-import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.MathUtils;
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
index 35be07f..5c71565 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFSOutputStream;
-import org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;