This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fbce927d8 [Engine][Checkpoint]Keep Storage are single instance (#3709)
fbce927d8 is described below
commit fbce927d8dd0d6affc70a43188924b56a8d9d29d
Author: Kirs <[email protected]>
AuthorDate: Tue Dec 13 10:51:28 2022 +0800
[Engine][Checkpoint]Keep Storage are single instance (#3709)
* [Engine][Checkpoint]Keep Storage are single instance
* rename method
---
.../checkpoint/storage/hdfs/HdfsStorage.java | 2 +-
.../storage/hdfs/HdfsStorageFactory.java | 6 ++-
.../hdfs/common/HdfsFileStorageInstance.java | 57 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index cb76a8198..fdd3c2d2c 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -88,7 +88,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
Path filePath = new Path(getStorageParentDirectory() +
state.getJobId() + "/" + getCheckPointName(state));
Path tmpFilePath = new Path(getStorageParentDirectory() +
state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
- try (FSDataOutputStream out = fs.create(tmpFilePath)) {
+ try (FSDataOutputStream out = fs.create(tmpFilePath, false)) {
out.write(datas);
} catch (IOException e) {
throw new CheckpointStorageException("Failed to write checkpoint
data, state: " + state, e);
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index d528dc9f5..43111cf67 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.HdfsFileStorageInstance;
import com.google.auto.service.AutoService;
@@ -63,6 +64,9 @@ public class HdfsStorageFactory implements
CheckpointStorageFactory {
@Override
public CheckpointStorage create(Map<String, String> configuration) throws
CheckpointStorageException {
- return new HdfsStorage(configuration);
+ if (HdfsFileStorageInstance.isFsNull()) {
+ return HdfsFileStorageInstance.getOrCreateStorage(configuration);
+ }
+ return HdfsFileStorageInstance.getHdfsStorage();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
new file mode 100644
index 000000000..bdde93d04
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage;
+
+import java.util.Map;
+
+public class HdfsFileStorageInstance {
+ private HdfsFileStorageInstance() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ private static HdfsStorage HDFS_STORAGE;
+ private static final Object LOCK = new Object();
+
+ public static boolean isFsNull() {
+ return HDFS_STORAGE == null;
+ }
+
+ public static HdfsStorage getHdfsStorage() {
+ return HDFS_STORAGE;
+ }
+
+ public static HdfsStorage getOrCreateStorage(Map<String, String> config)
throws CheckpointStorageException {
+ if (null != HDFS_STORAGE) {
+ return HDFS_STORAGE;
+ }
+ synchronized (LOCK) {
+ if (null != HDFS_STORAGE) {
+ return HDFS_STORAGE;
+ }
+ HDFS_STORAGE = new HdfsStorage(config);
+ return HDFS_STORAGE;
+ }
+ }
+
+}