This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch fixing_hadoop_common_scope in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1398dd37891ef0673e7d0194141af684aee625e1 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Fri Apr 17 14:23:10 2020 -0700 Remove hadoop dependency in Create Segment Command --- .../pinot/spi/filesystem/PinotFSFactory.java | 11 ++- .../tools/admin/command/CreateSegmentCommand.java | 82 ++++++---------------- 2 files changed, 25 insertions(+), 68 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java index 38d0eff..97c1fd0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java @@ -39,8 +39,12 @@ public class PinotFSFactory { private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class); private static final String LOCAL_PINOT_FS_SCHEME = "file"; private static final String CLASS = "class"; + private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() { + { + put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS()); + } + }; - private static Map<String, PinotFS> PINOT_FS_MAP = new HashMap<>(); public static void register(String scheme, String fsClassName, Configuration configuration) { try { @@ -66,11 +70,6 @@ public class PinotFSFactory { LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, fsClassName); register(key, fsClassName, fsConfig.subset(key)); } - - if (!PINOT_FS_MAP.containsKey(LOCAL_PINOT_FS_SCHEME)) { - LOGGER.info("LocalPinotFS not configured, adding as default"); - PINOT_FS_MAP.put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS()); - } } public static PinotFS create(String scheme) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java index 6041dc8..67762b2 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java @@ -21,17 +21,11 @@ package org.apache.pinot.tools.admin.command; import java.io.File; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; @@ -42,6 +36,8 @@ import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; @@ -269,22 +265,25 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co } // Filter out all input files. - final Path dataDirPath = new Path(_dataDir); - FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new Configuration()); + URI dataDirURI = URI.create(_dataDir); + if (dataDirURI.getScheme() == null) { + dataDirURI = new File(_dataDir).toURI(); + } + PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme()); - if (!fileSystem.exists(dataDirPath) || !fileSystem.isDirectory(dataDirPath)) { + if (!pinotFS.exists(dataDirURI) || !pinotFS.isDirectory(dataDirURI)) { throw new RuntimeException("Data directory " + _dataDir + " not found."); } // Gather all data files - List<Path> dataFilePaths = getDataFilePaths(dataDirPath); + String[] dataFilePaths = pinotFS.listFiles(dataDirURI, true); - if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) { + if ((dataFilePaths == null) || (dataFilePaths.length == 0)) { throw new RuntimeException( "Data directory " + _dataDir + " does not contain " + _format.toString().toUpperCase() + " files."); } - LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths.toArray())); + LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths)); // Make sure output directory does not already exist, or can be overwritten. File outDir = new File(_outDir); @@ -325,7 +324,7 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co ExecutorService executor = Executors.newFixedThreadPool(_numThreads); int cnt = 0; - for (final Path dataFilePath : dataFilePaths) { + for (final String dataFilePath : dataFilePaths) { final int segCnt = cnt; executor.execute(new Runnable() { @@ -334,10 +333,10 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co for (int curr = 0; curr <= _retry; curr++) { try { SegmentGeneratorConfig config = new SegmentGeneratorConfig(segmentGeneratorConfig); - - String localFile = dataFilePath.getName(); - Path localFilePath = new Path(localFile); - dataDirPath.getFileSystem(new Configuration()).copyToLocalFile(dataFilePath, localFilePath); + URI dataFileUri = URI.create(dataFilePath); + String[] splits = dataFilePath.split("/"); + String localFile = splits[splits.length - 1]; + pinotFS.copyToLocalFile(dataFileUri, new File(localFile)); config.setInputFilePath(localFile); config.setSegmentName(_segmentName + "_" + segCnt); Schema schema = Schema.fromFile(new File(_schemaFile)); @@ -398,9 +397,10 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co try { try { localTempDir.getParentFile().mkdirs(); - FileSystem.get(URI.create(indexDir.toString()), new Configuration()) - .copyToLocalFile(new Path(indexDir.toString()), new Path(localTempDir.toString())); - } catch (IOException e) { + URI indexDirUri = URI.create(indexDir.toString()); + PinotFS pinotFs = PinotFSFactory.create(indexDirUri.getScheme()); + pinotFs.copyToLocalFile(indexDirUri, localTempDir); + } catch (Exception e) { LOGGER.error("Failed to copy segment {} to local directory {} for verification.", indexDir, localTempDir, e); return false; } @@ -418,46 +418,4 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co FileUtils.deleteQuietly(localTempDir); } } - - protected List<Path> getDataFilePaths(Path pathPattern) - throws IOException { - List<Path> tarFilePaths = new ArrayList<>(); - FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration()); - getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths); - return tarFilePaths; - } - - protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths) - throws IOException { - for (FileStatus fileStatus : fileStatuses) { - Path path = fileStatus.getPath(); - if (fileStatus.isDirectory()) { - getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths); - } else { - if (isDataFile(path.getName())) { - tarFilePaths.add(path); - } - } - } - } - - protected boolean isDataFile(String fileName) { - switch (_format) { - case AVRO: - case GZIPPED_AVRO: - return fileName.endsWith(".avro"); - case CSV: - return fileName.endsWith(".csv"); - case JSON: - return fileName.endsWith(".json"); - case THRIFT: - return fileName.endsWith(".thrift"); - case PARQUET: - return fileName.endsWith(".parquet"); - case ORC: - return fileName.endsWith(".orc"); - default: - throw new IllegalStateException("Unsupported file format for segment creation: " + _format); - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org