This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fbce927d8 [Engine][Checkpoint]Keep Storage are single instance (#3709)
fbce927d8 is described below

commit fbce927d8dd0d6affc70a43188924b56a8d9d29d
Author: Kirs <[email protected]>
AuthorDate: Tue Dec 13 10:51:28 2022 +0800

    [Engine][Checkpoint]Keep Storage are single instance (#3709)
    
    * [Engine][Checkpoint]Keep Storage are single instance
    
    * rename method
---
 .../checkpoint/storage/hdfs/HdfsStorage.java       |  2 +-
 .../storage/hdfs/HdfsStorageFactory.java           |  6 ++-
 .../hdfs/common/HdfsFileStorageInstance.java       | 57 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index cb76a8198..fdd3c2d2c 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -88,7 +88,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
         Path filePath = new Path(getStorageParentDirectory() + 
state.getJobId() + "/" + getCheckPointName(state));
 
         Path tmpFilePath = new Path(getStorageParentDirectory() + 
state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
-        try (FSDataOutputStream out = fs.create(tmpFilePath)) {
+        try (FSDataOutputStream out = fs.create(tmpFilePath, false)) {
             out.write(datas);
         } catch (IOException e) {
             throw new CheckpointStorageException("Failed to write checkpoint 
data, state: " + state, e);
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index d528dc9f5..43111cf67 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
 import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.HdfsFileStorageInstance;
 
 import com.google.auto.service.AutoService;
 
@@ -63,6 +64,9 @@ public class HdfsStorageFactory implements 
CheckpointStorageFactory {
 
     @Override
     public CheckpointStorage create(Map<String, String> configuration) throws 
CheckpointStorageException {
-        return new HdfsStorage(configuration);
+        if (HdfsFileStorageInstance.isFsNull()) {
+            return HdfsFileStorageInstance.getOrCreateStorage(configuration);
+        }
+        return HdfsFileStorageInstance.getHdfsStorage();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
new file mode 100644
index 000000000..bdde93d04
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsFileStorageInstance.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage;
+
+import java.util.Map;
+
+public class HdfsFileStorageInstance {
+    private HdfsFileStorageInstance() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static HdfsStorage HDFS_STORAGE;
+    private static final Object LOCK = new Object();
+
+    public static boolean isFsNull() {
+        return HDFS_STORAGE == null;
+    }
+
+    public static HdfsStorage getHdfsStorage() {
+        return HDFS_STORAGE;
+    }
+
+    public static HdfsStorage getOrCreateStorage(Map<String, String> config) 
throws CheckpointStorageException {
+        if (null != HDFS_STORAGE) {
+            return HDFS_STORAGE;
+        }
+        synchronized (LOCK) {
+            if (null != HDFS_STORAGE) {
+                return HDFS_STORAGE;
+            }
+            HDFS_STORAGE = new HdfsStorage(config);
+            return HDFS_STORAGE;
+        }
+    }
+
+}

Reply via email to