This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch segmentcommitend in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 625482337f87474fce3faeaf49663b96ddda660a Author: Jennifer Dai <[email protected]> AuthorDate: Mon Dec 3 10:46:27 2018 -0800 Changing segmentCommitEnd to support deep storage --- .../linkedin/pinot/controller/ControllerConf.java | 2 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 49 ++++++++++++---------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java index e18f13a..f7214bd 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java @@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration { } public String getLocalTempDir() { - return getString(LOCAL_TEMP_DIR, null); + return getString(LOCAL_TEMP_DIR, getDataDir()); } public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) { diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9cedc35..01da549 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -59,11 +59,12 @@ import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties; import com.linkedin.pinot.core.segment.creator.impl.V1Constants; import com.linkedin.pinot.core.segment.index.ColumnMetadata; import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl; +import com.linkedin.pinot.filesystem.PinotFS; +import com.linkedin.pinot.filesystem.PinotFSFactory; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager { public boolean commitSegmentFile(String tableName, CommittingSegmentDescriptor committingSegmentDescriptor) { String segmentName = committingSegmentDescriptor.getSegmentName(); String segmentLocation = committingSegmentDescriptor.getSegmentLocation(); - File segmentFile = convertURIToSegmentLocation(segmentLocation); - - File baseDir = new File(_controllerConf.getDataDir()); - File tableDir = new File(baseDir, tableName); - File fileToMoveTo = new File(tableDir, segmentName); + URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation); + URI baseDirURI = ControllerConf.getUriFromPath(_controllerConf.getDataDir()); + URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", _controllerConf.getDataDir(), tableName)); + URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName)); + PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme()); if (!isConnected() || !isLeader()) { // We can potentially log a different value than what we saw .... @@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager { } try { - com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile, fileToMoveTo); + pinotFS.move(segmentFileURI, uriToMoveTo, true); } catch (Exception e) { - LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e); + LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e); return false; } - for (File file : tableDir.listFiles()) { - if (file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) { - LOGGER.warn("Deleting " + file); - FileUtils.deleteQuietly(file); - } - } - return true; - } - private static File convertURIToSegmentLocation(String segmentLocation) { try { - URI uri = new URI(segmentLocation); - return new File(uri.getPath()); - } catch (URISyntaxException e) { - throw new RuntimeException("Could not convert URI " + segmentLocation + " to segment location", e); + for (String uri : pinotFS.listFiles(tableDirURI, true)) { + if (uri.startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) { + LOGGER.warn("Deleting " + uri); + pinotFS.delete(new URI(uri), true); + } + } + } catch (Exception e) { + LOGGER.warn("Could not tmp segment files"); } + + return true; } /** @@ -652,12 +650,19 @@ public class PinotLLCRealtimeSegmentManager { protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) { String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), rawTableName); String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr); - String tempMetadataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + METADATA_TEMP_DIR_SUFFIX); + + String localTempDir = _controllerConf.getLocalTempDir(); + String tempMetadataDirStr = StringUtil.join("/", localTempDir, segmentNameStr + METADATA_TEMP_DIR_SUFFIX); + + PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(baseDirStr).getScheme()); + File tempMetadataDir = new File(tempMetadataDirStr); try { Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr); + pinotFS.copyToLocalFile(ControllerConf.getUriFromPath(segFileStr), new File(tempMetadataDirStr)); + // Extract metadata.properties InputStream metadataPropertiesInputStream = TarGzCompressionUtils.unTarOneFile(new FileInputStream(new File(segFileStr)), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
