This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch bugfixing_hadoop_filesystem in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 52f140aacd981eb1fe3b873fb34befe5643078cb Author: Xiang Fu <[email protected]> AuthorDate: Mon Sep 30 10:37:50 2019 -0700 more logging --- pinot-hadoop/pom.xml | 11 +++++++++++ .../main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java | 8 +++++++- .../java/org/apache/pinot/hadoop/job/SegmentCreationJob.java | 6 +++--- .../pinot/hadoop/job/mappers/SegmentCreationMapper.java | 2 +- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml index f6c2b55..8e2babb 100644 --- a/pinot-hadoop/pom.xml +++ b/pinot-hadoop/pom.xml @@ -43,6 +43,7 @@ <plugins> <plugin> <artifactId>maven-shade-plugin</artifactId> + <version>3.2.1</version> <executions> <execution> <phase>package</phase> @@ -50,6 +51,16 @@ <goal>shade</goal> </goals> <configuration> + <relocations> + <relocation> + <pattern>com.google.common.base.Preconditions</pattern> + <shadedPattern>shaded.com.google.common.base.Preconditions</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>shaded.com.fasterxml.jackson</shadedPattern> + </relocation> + </relocations> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java index 7828fdf..fb39ac1 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/BaseSegmentJob.java @@ -62,7 +62,13 @@ public abstract class BaseSegmentJob extends Configured { throws IOException { List<Path> tarFilePaths = new ArrayList<>(); FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), _conf); - getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths); + _logger.info("Using filesystem: {}", fileSystem); + final FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern); + if (fileStatuses == null) { + _logger.warn("Unable to match file status from file path pattern: {}", pathPattern); + } else { + getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths); + } return tarFilePaths; } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java index aa68f10..9807757 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java @@ -109,7 +109,7 @@ public class SegmentCreationJob extends BaseSegmentJob { return true; } return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName - .endsWith(".thrift"); + .endsWith(".thrift") || fileName.endsWith(".parquet"); } public void run() @@ -117,7 +117,7 @@ public class SegmentCreationJob extends BaseSegmentJob { _logger.info("Starting {}", getClass().getSimpleName()); // Initialize all directories - _fileSystem = FileSystem.get(_conf); + _fileSystem = FileSystem.get(_outputDir.toUri(), _conf); JobPreparationHelper.mkdirs(_fileSystem, _outputDir, _defaultPermissionsMask); JobPreparationHelper.mkdirs(_fileSystem, _stagingDir, _defaultPermissionsMask); Path stagingInputDir = new Path(_stagingDir, "input"); @@ -208,7 +208,7 @@ public class SegmentCreationJob extends BaseSegmentJob { if (controllerRestApi != null) { return controllerRestApi.getSchema(); } else { - try (InputStream inputStream = _fileSystem.open(_schemaFile)) { + try (InputStream inputStream = FileSystem.get(_schemaFile.toUri(), getConf()).open(_schemaFile)) { return Schema.fromInputSteam(inputStream); } } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 013104a..823c370 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -148,7 +148,6 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab Preconditions.checkState(_localSegmentDir.mkdir()); Preconditions.checkState(_localSegmentTarDir.mkdir()); - _fileSystem = FileSystem.get(context.getConfiguration()); _logger.info("*********************************************************************"); _logger.info("Raw Table Name: {}", _rawTableName); @@ -194,6 +193,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab Path hdfsInputFile = new Path(splits[0]); int sequenceId = Integer.parseInt(splits[1]); + _fileSystem = FileSystem.get(hdfsInputFile.toUri(), context.getConfiguration()); _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId); String inputFileName = hdfsInputFile.getName(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
