This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch segmentcommitend
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 625482337f87474fce3faeaf49663b96ddda660a
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Dec 3 10:46:27 2018 -0800

    Changing segmentCommitEnd to support deep storage
---
 .../linkedin/pinot/controller/ControllerConf.java  |  2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 49 ++++++++++++----------
 2 files changed, 28 insertions(+), 23 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index e18f13a..f7214bd 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration 
{
   }
 
   public String getLocalTempDir() {
-    return getString(LOCAL_TEMP_DIR, null);
+    return getString(LOCAL_TEMP_DIR, getDataDir());
   }
 
   public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9cedc35..01da549 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -59,11 +59,12 @@ import 
com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager {
   public boolean commitSegmentFile(String tableName, 
CommittingSegmentDescriptor committingSegmentDescriptor) {
     String segmentName = committingSegmentDescriptor.getSegmentName();
     String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
-    File segmentFile = convertURIToSegmentLocation(segmentLocation);
-
-    File baseDir = new File(_controllerConf.getDataDir());
-    File tableDir = new File(baseDir, tableName);
-    File fileToMoveTo = new File(tableDir, segmentName);
+    URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
+    URI baseDirURI = 
ControllerConf.getUriFromPath(_controllerConf.getDataDir());
+    URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", 
_controllerConf.getDataDir(), tableName));
+    URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", 
tableDirURI.toString(), segmentName));
+    PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
     if (!isConnected() || !isLeader()) {
       // We can potentially log a different value than what we saw ....
@@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     try {
-      
com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile, 
fileToMoveTo);
+      pinotFS.move(segmentFileURI, uriToMoveTo, true);
     } catch (Exception e) {
-      LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e);
+      LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
       return false;
     }
-    for (File file : tableDir.listFiles()) {
-      if 
(file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName)))
 {
-        LOGGER.warn("Deleting " + file);
-        FileUtils.deleteQuietly(file);
-      }
-    }
-    return true;
-  }
 
-  private static File convertURIToSegmentLocation(String segmentLocation) {
     try {
-      URI uri = new URI(segmentLocation);
-      return new File(uri.getPath());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Could not convert URI " + segmentLocation + 
" to segment location", e);
+      for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+        if 
(uri.startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
+          LOGGER.warn("Deleting " + uri);
+          pinotFS.delete(new URI(uri), true);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Could not tmp segment files");
     }
+
+    return true;
   }
 
   /**
@@ -652,12 +650,19 @@ public class PinotLLCRealtimeSegmentManager {
   protected SegmentMetadataImpl extractSegmentMetadata(final String 
rawTableName, final String segmentNameStr) {
     String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), 
rawTableName);
     String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
-    String tempMetadataDirStr = StringUtil.join("/", baseDirStr, 
segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+    String localTempDir = _controllerConf.getLocalTempDir();
+    String tempMetadataDirStr = StringUtil.join("/", localTempDir, 
segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+    PinotFS pinotFS = 
PinotFSFactory.create(ControllerConf.getUriFromPath(baseDirStr).getScheme());
+
     File tempMetadataDir = new File(tempMetadataDirStr);
 
     try {
       Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create 
directory: %s", tempMetadataDirStr);
 
+      pinotFS.copyToLocalFile(ControllerConf.getUriFromPath(segFileStr), new 
File(tempMetadataDirStr));
+
       // Extract metadata.properties
       InputStream metadataPropertiesInputStream =
           TarGzCompressionUtils.unTarOneFile(new FileInputStream(new 
File(segFileStr)),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to