This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3216159 Support aws s3 with Parquet in pinot-tools (#4556)
3216159 is described below
commit 3216159261d7bbca28beba9dd3458520f9b40af5
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Aug 30 10:54:50 2019 -0700
Support aws s3 with Parquet in pinot-tools (#4556)
* Support aws s3 with Parquet in pinot-tools
* resolve dependency issue
* Address comments
---
.../apache/pinot/core/data/readers/FileFormat.java | 2 +-
pinot-tools/pom.xml | 20 ++++++
.../tools/admin/command/CreateSegmentCommand.java | 74 ++++++++++++++++++----
pom.xml | 5 ++
4 files changed, 86 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 5f7120d..43f8391 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
package org.apache.pinot.core.data.readers;
public enum FileFormat {
- AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
+ AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER
}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 55c2ecd..0799c97 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,6 +56,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-parquet</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-connector-kafka-base</artifactId>
<version>${project.version}</version>
</dependency>
@@ -98,6 +103,21 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 95c1371..5ce2c79 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -19,20 +19,29 @@
package org.apache.pinot.tools.admin.command;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pinot.common.data.StarTreeIndexSpec;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.data.readers.RecordReaderFactory;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.parquet.data.readers.ParquetRecordReader;
import org.apache.pinot.startree.hll.HllConfig;
import org.apache.pinot.startree.hll.HllConstants;
import org.apache.pinot.tools.Command;
@@ -283,24 +292,22 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
}
// Filter out all input files.
- File dir = new File(_dataDir);
- if (!dir.exists() || !dir.isDirectory()) {
+ final Path dataDirPath = new Path(_dataDir);
+ FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new
Configuration());
+
+ if (!fileSystem.exists(dataDirPath) ||
!fileSystem.isDirectory(dataDirPath)) {
throw new RuntimeException("Data directory " + _dataDir + " not found.");
}
- File[] files = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(_format.toString().toLowerCase());
- }
- });
+ // Gather all data files
+ List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
- if ((files == null) || (files.length == 0)) {
+ if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
throw new RuntimeException(
"Data directory " + _dataDir + " does not contain " +
_format.toString().toUpperCase() + " files.");
}
- LOGGER.info("Accepted files: {}", Arrays.toString(files));
+ LOGGER.info("Accepted files: {}",
Arrays.toString(dataFilePaths.toArray()));
// Make sure output directory does not already exist, or can be
overwritten.
File outDir = new File(_outDir);
@@ -362,7 +369,7 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
int cnt = 0;
- for (final File file : files) {
+ for (final Path dataFilePath : dataFilePaths) {
final int segCnt = cnt;
executor.execute(new Runnable() {
@@ -370,12 +377,24 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
public void run() {
try {
SegmentGeneratorConfig config = new
SegmentGeneratorConfig(segmentGeneratorConfig);
- config.setInputFilePath(file.getAbsolutePath());
+
+ String localFile = dataFilePath.getName();
+ Path localFilePath = new Path(localFile);
+ dataDirPath.getFileSystem(new
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
+ config.setInputFilePath(localFile);
config.setSegmentName(_segmentName + "_" + segCnt);
config.loadConfigFiles();
final SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config);
+ switch (config.getFormat()) {
+ case PARQUET:
+ RecordReader parquetRecordReader = new ParquetRecordReader();
+ parquetRecordReader.init(config);
+ driver.init(config, parquetRecordReader);
+ break;
+ default:
+ driver.init(config);
+ }
driver.build();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -388,4 +407,31 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
executor.shutdown();
return executor.awaitTermination(1, TimeUnit.HOURS);
}
+
+ protected List<Path> getDataFilePaths(Path pathPattern)
+ throws IOException {
+ List<Path> tarFilePaths = new ArrayList<>();
+ FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new
Configuration());
+ getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern),
tarFilePaths);
+ return tarFilePaths;
+ }
+
+ protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[]
fileStatuses, List<Path> tarFilePaths)
+ throws IOException {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if (fileStatus.isDirectory()) {
+ getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path),
tarFilePaths);
+ } else {
+ if (isDataFile(path.getName())) {
+ tarFilePaths.add(path);
+ }
+ }
+ }
+ }
+
+ protected boolean isDataFile(String fileName) {
+ return fileName.endsWith(".avro") || fileName.endsWith(".csv") ||
fileName.endsWith(".json") || fileName
+ .endsWith(".thrift") || fileName.endsWith(".parquet");
+ }
}
diff --git a/pom.xml b/pom.xml
index cc8767c..d770f29 100644
--- a/pom.xml
+++ b/pom.xml
@@ -657,6 +657,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.5.2</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]