This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dac899f  Changing segmentCommitEnd to support deep storage (#3578)
dac899f is described below

commit dac899fbc19dc936b5684bd076d83229bc475c1d
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Dec 3 15:31:04 2018 -0800

    Changing segmentCommitEnd to support deep storage (#3578)
    
    * Changing the way we move file to final location to use pinotFS so that we 
can support different storage backends
    * Testing covered by realtime integration tests
    * Part 2 will be making the metadata portion backwards compatible as well
---
 .../linkedin/pinot/controller/ControllerConf.java  |  2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 40 ++++++++++------------
 .../PinotLLCRealtimeSegmentManagerTest.java        |  4 +++
 3 files changed, 24 insertions(+), 22 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index e18f13a..f7214bd 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration 
{
   }
 
   public String getLocalTempDir() {
-    return getString(LOCAL_TEMP_DIR, null);
+    return getString(LOCAL_TEMP_DIR, getDataDir());
   }
 
   public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9cedc35..b1ff634 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -59,11 +59,12 @@ import 
com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager {
   public boolean commitSegmentFile(String tableName, 
CommittingSegmentDescriptor committingSegmentDescriptor) {
     String segmentName = committingSegmentDescriptor.getSegmentName();
     String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
-    File segmentFile = convertURIToSegmentLocation(segmentLocation);
-
-    File baseDir = new File(_controllerConf.getDataDir());
-    File tableDir = new File(baseDir, tableName);
-    File fileToMoveTo = new File(tableDir, segmentName);
+    URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
+    URI baseDirURI = 
ControllerConf.getUriFromPath(_controllerConf.getDataDir());
+    URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", 
_controllerConf.getDataDir(), tableName));
+    URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", 
tableDirURI.toString(), segmentName));
+    PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
     if (!isConnected() || !isLeader()) {
       // We can potentially log a different value than what we saw ....
@@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     try {
-      
com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile, 
fileToMoveTo);
+      pinotFS.move(segmentFileURI, uriToMoveTo, true);
     } catch (Exception e) {
-      LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e);
+      LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
       return false;
     }
-    for (File file : tableDir.listFiles()) {
-      if 
(file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName)))
 {
-        LOGGER.warn("Deleting " + file);
-        FileUtils.deleteQuietly(file);
-      }
-    }
-    return true;
-  }
 
-  private static File convertURIToSegmentLocation(String segmentLocation) {
     try {
-      URI uri = new URI(segmentLocation);
-      return new File(uri.getPath());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Could not convert URI " + segmentLocation + 
" to segment location", e);
+      for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+        if 
(uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
+          LOGGER.warn("Deleting " + uri);
+          pinotFS.delete(new URI(uri), true);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Could not delete tmp segment files for {}", tableDirURI, e);
     }
+
+    return true;
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 14f00dd..a5653a8 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -45,6 +45,7 @@ import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
 import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +60,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nonnull;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -1120,6 +1122,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
   @Test
   public void testCommitSegmentFile() throws Exception {
+    PinotFSFactory.init(new PropertiesConfiguration());
     PinotLLCRealtimeSegmentManager realtimeSegmentManager =
         new 
FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
     String tableName = "fakeTable_REALTIME";
@@ -1138,6 +1141,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
   @Test
   public void testSegmentAlreadyThereAndExtraneousFilesDeleted() throws 
Exception {
+    PinotFSFactory.init(new PropertiesConfiguration());
     PinotLLCRealtimeSegmentManager realtimeSegmentManager =
         new 
FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
     String tableName = "fakeTable_REALTIME";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to