This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dac899f Changing segmentCommitEnd to support deep storage (#3578)
dac899f is described below
commit dac899fbc19dc936b5684bd076d83229bc475c1d
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Dec 3 15:31:04 2018 -0800
Changing segmentCommitEnd to support deep storage (#3578)
* Changing the way we move file to final location to use pinotFS so that we
can support different storage backends
* Testing covered by realtime integration tests
* Part 2 will be making the metadata portion backwards compatible as well
---
.../linkedin/pinot/controller/ControllerConf.java | 2 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 40 ++++++++++------------
.../PinotLLCRealtimeSegmentManagerTest.java | 4 +++
3 files changed, 24 insertions(+), 22 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index e18f13a..f7214bd 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration
{
}
public String getLocalTempDir() {
- return getString(LOCAL_TEMP_DIR, null);
+ return getString(LOCAL_TEMP_DIR, getDataDir());
}
public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) {
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9cedc35..b1ff634 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -59,11 +59,12 @@ import
com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager {
public boolean commitSegmentFile(String tableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String segmentName = committingSegmentDescriptor.getSegmentName();
String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
- File segmentFile = convertURIToSegmentLocation(segmentLocation);
-
- File baseDir = new File(_controllerConf.getDataDir());
- File tableDir = new File(baseDir, tableName);
- File fileToMoveTo = new File(tableDir, segmentName);
+ URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
+ URI baseDirURI =
ControllerConf.getUriFromPath(_controllerConf.getDataDir());
+ URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/",
_controllerConf.getDataDir(), tableName));
+ URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/",
tableDirURI.toString(), segmentName));
+ PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
if (!isConnected() || !isLeader()) {
// We can potentially log a different value than what we saw ....
@@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager {
}
try {
-
com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile,
fileToMoveTo);
+ pinotFS.move(segmentFileURI, uriToMoveTo, true);
} catch (Exception e) {
- LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e);
+ LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
return false;
}
- for (File file : tableDir.listFiles()) {
- if
(file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName)))
{
- LOGGER.warn("Deleting " + file);
- FileUtils.deleteQuietly(file);
- }
- }
- return true;
- }
- private static File convertURIToSegmentLocation(String segmentLocation) {
try {
- URI uri = new URI(segmentLocation);
- return new File(uri.getPath());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Could not convert URI " + segmentLocation +
" to segment location", e);
+ for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+ if
(uri.contains(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
+ LOGGER.warn("Deleting " + uri);
+ pinotFS.delete(new URI(uri), true);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Could not delete tmp segment files for {}", tableDirURI, e);
}
+
+ return true;
}
/**
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 14f00dd..a5653a8 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -45,6 +45,7 @@ import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
import java.io.IOException;
@@ -59,6 +60,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nonnull;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
@@ -1120,6 +1122,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testCommitSegmentFile() throws Exception {
+ PinotFSFactory.init(new PropertiesConfiguration());
PinotLLCRealtimeSegmentManager realtimeSegmentManager =
new
FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
String tableName = "fakeTable_REALTIME";
@@ -1138,6 +1141,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Test
public void testSegmentAlreadyThereAndExtraneousFilesDeleted() throws
Exception {
+ PinotFSFactory.init(new PropertiesConfiguration());
PinotLLCRealtimeSegmentManager realtimeSegmentManager =
new
FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
String tableName = "fakeTable_REALTIME";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]