This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_hadoop_common_scope
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1398dd37891ef0673e7d0194141af684aee625e1
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Fri Apr 17 14:23:10 2020 -0700

    Remove hadoop dependency in Create Segment Command
---
 .../pinot/spi/filesystem/PinotFSFactory.java       | 11 ++-
 .../tools/admin/command/CreateSegmentCommand.java  | 82 ++++++----------------
 2 files changed, 25 insertions(+), 68 deletions(-)

diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index 38d0eff..97c1fd0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -39,8 +39,12 @@ public class PinotFSFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotFSFactory.class);
   private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
+  private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, 
PinotFS>() {
+    {
+      put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
+    }
+  };
 
-  private static Map<String, PinotFS> PINOT_FS_MAP = new HashMap<>();
 
   public static void register(String scheme, String fsClassName, Configuration 
configuration) {
     try {
@@ -66,11 +70,6 @@ public class PinotFSFactory {
       LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, 
fsClassName);
       register(key, fsClassName, fsConfig.subset(key));
     }
-
-    if (!PINOT_FS_MAP.containsKey(LOCAL_PINOT_FS_SCHEME)) {
-      LOGGER.info("LocalPinotFS not configured, adding as default");
-      PINOT_FS_MAP.put(LOCAL_PINOT_FS_SCHEME, new LocalPinotFS());
-    }
   }
 
   public static PinotFS create(String scheme) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 6041dc8..67762b2 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -21,17 +21,11 @@ package org.apache.pinot.tools.admin.command;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -42,6 +36,8 @@ import 
org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
@@ -269,22 +265,25 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     }
 
     // Filter out all input files.
-    final Path dataDirPath = new Path(_dataDir);
-    FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new 
Configuration());
+    URI dataDirURI = URI.create(_dataDir);
+    if (dataDirURI.getScheme() == null) {
+      dataDirURI = new File(_dataDir).toURI();
+    }
+    PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
 
-    if (!fileSystem.exists(dataDirPath) || 
!fileSystem.isDirectory(dataDirPath)) {
+    if (!pinotFS.exists(dataDirURI) || !pinotFS.isDirectory(dataDirURI)) {
       throw new RuntimeException("Data directory " + _dataDir + " not found.");
     }
 
     // Gather all data files
-    List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
+    String[] dataFilePaths = pinotFS.listFiles(dataDirURI, true);
 
-    if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
+    if ((dataFilePaths == null) || (dataFilePaths.length == 0)) {
       throw new RuntimeException(
           "Data directory " + _dataDir + " does not contain " + 
_format.toString().toUpperCase() + " files.");
     }
 
-    LOGGER.info("Accepted files: {}", 
Arrays.toString(dataFilePaths.toArray()));
+    LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths));
 
     // Make sure output directory does not already exist, or can be 
overwritten.
     File outDir = new File(_outDir);
@@ -325,7 +324,7 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
 
     ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
     int cnt = 0;
-    for (final Path dataFilePath : dataFilePaths) {
+    for (final String dataFilePath : dataFilePaths) {
       final int segCnt = cnt;
 
       executor.execute(new Runnable() {
@@ -334,10 +333,10 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
           for (int curr = 0; curr <= _retry; curr++) {
             try {
               SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(segmentGeneratorConfig);
-
-              String localFile = dataFilePath.getName();
-              Path localFilePath = new Path(localFile);
-              dataDirPath.getFileSystem(new 
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
+              URI dataFileUri = URI.create(dataFilePath);
+              String[] splits = dataFilePath.split("/");
+              String localFile = splits[splits.length - 1];
+              pinotFS.copyToLocalFile(dataFileUri, new File(localFile));
               config.setInputFilePath(localFile);
               config.setSegmentName(_segmentName + "_" + segCnt);
               Schema schema = Schema.fromFile(new File(_schemaFile));
@@ -398,9 +397,10 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     try {
       try {
         localTempDir.getParentFile().mkdirs();
-        FileSystem.get(URI.create(indexDir.toString()), new Configuration())
-            .copyToLocalFile(new Path(indexDir.toString()), new 
Path(localTempDir.toString()));
-      } catch (IOException e) {
+        URI indexDirUri = URI.create(indexDir.toString());
+        PinotFS pinotFs = PinotFSFactory.create(indexDirUri.getScheme());
+        pinotFs.copyToLocalFile(indexDirUri, localTempDir);
+      } catch (Exception e) {
         LOGGER.error("Failed to copy segment {} to local directory {} for 
verification.", indexDir, localTempDir, e);
         return false;
       }
@@ -418,46 +418,4 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
       FileUtils.deleteQuietly(localTempDir);
     }
   }
-
-  protected List<Path> getDataFilePaths(Path pathPattern)
-      throws IOException {
-    List<Path> tarFilePaths = new ArrayList<>();
-    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new 
Configuration());
-    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), 
tarFilePaths);
-    return tarFilePaths;
-  }
-
-  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] 
fileStatuses, List<Path> tarFilePaths)
-      throws IOException {
-    for (FileStatus fileStatus : fileStatuses) {
-      Path path = fileStatus.getPath();
-      if (fileStatus.isDirectory()) {
-        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), 
tarFilePaths);
-      } else {
-        if (isDataFile(path.getName())) {
-          tarFilePaths.add(path);
-        }
-      }
-    }
-  }
-
-  protected boolean isDataFile(String fileName) {
-    switch (_format) {
-      case AVRO:
-      case GZIPPED_AVRO:
-        return fileName.endsWith(".avro");
-      case CSV:
-        return fileName.endsWith(".csv");
-      case JSON:
-        return fileName.endsWith(".json");
-      case THRIFT:
-        return fileName.endsWith(".thrift");
-      case PARQUET:
-        return fileName.endsWith(".parquet");
-      case ORC:
-        return fileName.endsWith(".orc");
-      default:
-        throw new IllegalStateException("Unsupported file format for segment 
creation: " + _format);
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to