This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch bugfixing_hadoop_filesystem
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6c3e4cfeab52c5d8ddb4b8e763def76dbc06d3dc
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Sep 30 10:37:50 2019 -0700

    more logging
---
 .../src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java | 8 +++++++-
 .../main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java | 6 +++---
 .../apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java    | 2 +-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
index 7828fdf..fb39ac1 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java
@@ -62,7 +62,13 @@ public abstract class BaseSegmentJob extends Configured {
       throws IOException {
     List<Path> tarFilePaths = new ArrayList<>();
     FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), _conf);
-    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), 
tarFilePaths);
+    _logger.info("Using filesystem: {}", fileSystem);
+    final FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern);
+    if (fileStatuses == null) {
+      _logger.warn("Unable to match file status from file path pattern: {}", 
pathPattern);
+    } else {
+      getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths);
+    }
     return tarFilePaths;
   }
 
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index aa68f10..9807757 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -109,7 +109,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
       return true;
     }
     return fileName.endsWith(".avro") || fileName.endsWith(".csv") || 
fileName.endsWith(".json") || fileName
-        .endsWith(".thrift");
+        .endsWith(".thrift") || fileName.endsWith(".parquet");
   }
 
   public void run()
@@ -117,7 +117,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
     _logger.info("Starting {}", getClass().getSimpleName());
 
     // Initialize all directories
-    _fileSystem = FileSystem.get(_conf);
+    _fileSystem = FileSystem.get(_outputDir.toUri(), _conf);
     JobPreparationHelper.mkdirs(_fileSystem, _outputDir, 
_defaultPermissionsMask);
     JobPreparationHelper.mkdirs(_fileSystem, _stagingDir, 
_defaultPermissionsMask);
     Path stagingInputDir = new Path(_stagingDir, "input");
@@ -208,7 +208,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
       if (controllerRestApi != null) {
         return controllerRestApi.getSchema();
       } else {
-        try (InputStream inputStream = _fileSystem.open(_schemaFile)) {
+        try (InputStream inputStream = FileSystem.get(_schemaFile.toUri(), 
getConf()).open(_schemaFile)) {
           return Schema.fromInputSteam(inputStream);
         }
       }
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 013104a..823c370 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -148,7 +148,6 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     Preconditions.checkState(_localSegmentDir.mkdir());
     Preconditions.checkState(_localSegmentTarDir.mkdir());
 
-    _fileSystem = FileSystem.get(context.getConfiguration());
 
     
_logger.info("*********************************************************************");
     _logger.info("Raw Table Name: {}", _rawTableName);
@@ -194,6 +193,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
 
     Path hdfsInputFile = new Path(splits[0]);
     int sequenceId = Integer.parseInt(splits[1]);
+    _fileSystem = FileSystem.get(hdfsInputFile.toUri(), 
context.getConfiguration());
     _logger.info("Generating segment with HDFS input file: {}, sequence id: 
{}", hdfsInputFile, sequenceId);
 
     String inputFileName = hdfsInputFile.getName();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to