This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch pinot-spark in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6fbd9da6d6d16412403cda0cb9ee9a1b9c1056dd Author: Xiang Fu <[email protected]> AuthorDate: Wed Nov 6 03:05:19 2019 -0800 Initial commit for pinot-spark --- pinot-spark/pom.xml | 52 ++- .../apache/pinot/spark/PinotSparkJobLauncher.java | 80 ++++ .../apache/pinot/spark/jobs/BaseSegmentJob.java | 137 +++++++ .../apache/pinot/spark/jobs/ControllerRestApi.java | 42 ++ .../pinot/spark/jobs/DefaultControllerRestApi.java | 192 +++++++++ .../pinot/spark/jobs/JobConfigConstants.java | 65 ++++ .../pinot/spark/jobs/SegmentCreationJob.java | 427 +++++++++++++++++++++ .../pinot/spark/jobs/SegmentCreationMapper.java | 323 ++++++++++++++++ .../apache/pinot/spark/jobs/SegmentTarPushJob.java | 116 ++++++ .../apache/pinot/spark/jobs/SegmentUriPushJob.java | 68 ++++ .../pinot/spark/utils/JobPreparationHelper.java | 70 ++++ .../org/apache/pinot/spark/utils/PushLocation.java | 54 +++ pom.xml | 11 + 13 files changed, 1628 insertions(+), 9 deletions(-) diff --git a/pinot-spark/pom.xml b/pinot-spark/pom.xml index 615b21a..6c56c38 100644 --- a/pinot-spark/pom.xml +++ b/pinot-spark/pom.xml @@ -71,7 +71,7 @@ <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.pinot.hadoop.PinotHadoopJobLauncher</mainClass> + <mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass> </transformer> </transformers> </configuration> @@ -95,6 +95,10 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-common</artifactId> </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -123,17 +127,34 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-core</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + <exclusion> + <groupId>javax.activation</groupId> + <artifactId>activation</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>commons-logging</groupId> @@ -149,11 +170,24 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <classifier>hadoop2</classifier> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <!--Test--> + <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java new file mode 100644 index 0000000..6ed9422 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/PinotSparkJobLauncher.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark; + +import java.io.FileInputStream; +import java.util.Arrays; +import java.util.Properties; +import org.apache.pinot.spark.jobs.SegmentCreationJob; +import org.apache.pinot.spark.jobs.SegmentTarPushJob; + + +public class PinotSparkJobLauncher { + + enum PinotSparkJobType { + SegmentCreation, SegmentTarPush, SegmentCreationAndTarPush + } + + private static final String USAGE = "usage: [job_type] [job.properties]"; + private static final String SUPPORT_JOB_TYPES = + "\tsupport job types: " + Arrays.toString(PinotSparkJobType.values()); + + private static void usage() { + System.err.println(USAGE); + System.err.println(SUPPORT_JOB_TYPES); + } + + private static void kickOffPinotSparkJob(PinotSparkJobType jobType, Properties jobConf) + throws Exception { + switch (jobType) { + case SegmentCreation: + new SegmentCreationJob(jobConf).run(); + break; + case SegmentTarPush: + new SegmentTarPushJob(jobConf).run(); + break; + case SegmentCreationAndTarPush: + new SegmentCreationJob(jobConf).run(); + new SegmentTarPushJob(jobConf).run(); + break; + default: + throw new RuntimeException("Not a valid jobType - " + jobType); + } + } + + + public static void main(String[] args) + throws Exception { + if (args.length != 2) { + usage(); + System.exit(1); + } + PinotSparkJobType jobType = null; + Properties jobConf = null; + try { + jobType = PinotSparkJobType.valueOf(args[0]); + jobConf = new Properties(); + jobConf.load(new FileInputStream(args[1])); + } catch (Exception e) { + usage(); + System.exit(1); + } + kickOffPinotSparkJob(jobType, jobConf); + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java new file mode 100644 index 0000000..3285899 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/BaseSegmentJob.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.common.Utils; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.spark.utils.PushLocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class BaseSegmentJob implements Serializable { + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + protected final Properties _properties; + protected final List<PushLocation> _pushLocations; + protected final String _rawTableName; + + protected BaseSegmentJob(Properties properties) { + _properties = properties; + Utils.logVersions(); + logProperties(); + + // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. + String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); + String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT); + if (pushHostsString != null && pushPortString != null) { + _pushLocations = + PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString)); + } else { + _pushLocations = null; + } + + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + } + + @Nullable + protected TableConfig getTableConfig() + throws IOException { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + return controllerRestApi != null ? controllerRestApi.getTableConfig() : null; + } + } + + /** + * This method is currently implemented in SegmentCreationJob and SegmentPreprocessingJob due to a dependency on + * the hadoop filesystem, which we can only get once the job begins to run. + * We return null here to make it clear that for now, all implementations of this method have to support + * reading from a schema file. In the future, we hope to deprecate reading the schema from the schema file in favor + * of mandating that a schema is pushed to the controller. + */ + @Nullable + protected org.apache.pinot.common.data.Schema getSchema() throws IOException { + return null; + } + + /** + * Can be overridden to provide custom controller Rest API. + */ + @Nullable + protected ControllerRestApi getControllerRestApi() { + return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null; + } + + protected void logProperties() { + _logger.info("*********************************************************************"); + _logger.info("Job Properties: {}", _properties); + _logger.info("*********************************************************************"); + } + + @Nullable + protected Path getPathFromProperty(String key) { + String value = _properties.getProperty(key); + return value != null ? new Path(value) : null; + } + + protected List<Path> getDataFilePaths(Path pathPattern) + throws IOException { + List<Path> tarFilePaths = new ArrayList<>(); + FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration()); + _logger.info("Using filesystem: {}", fileSystem); + FileStatus[] fileStatuses = fileSystem.globStatus(pathPattern); + if (fileStatuses == null) { + _logger.warn("Unable to match file status from file path pattern: {}", pathPattern); + } else { + getDataFilePathsHelper(fileSystem, fileStatuses, tarFilePaths); + } + return tarFilePaths; + } + + protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths); + } else { + // Skip temp files generated by computation frameworks like Hadoop/Spark. + if (path.getName().startsWith("_") || path.getName().startsWith(".")) { + continue; + } + if (isDataFile(path.getName())) { + tarFilePaths.add(path); + } + } + } + } + + protected abstract boolean isDataFile(String fileName); +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java new file mode 100644 index 0000000..4d0391c --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/ControllerRestApi.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import java.io.Closeable; +import java.util.List; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.data.Schema; + + +public interface ControllerRestApi extends Closeable { + + TableConfig getTableConfig(); + + Schema getSchema(); + + void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths); + + void sendSegmentUris(List<String> segmentUris); + + void deleteSegmentUris(List<String> segmentUris); + + List<String> getAllSegments(String tableType); +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java new file mode 100644 index 0000000..c9444a9 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/DefaultControllerRestApi.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.spark.utils.PushLocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultControllerRestApi implements ControllerRestApi { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultControllerRestApi.class); + + private final List<PushLocation> _pushLocations; + private final String _rawTableName; + private final FileUploadDownloadClient _fileUploadDownloadClient = new FileUploadDownloadClient(); + + private static final String OFFLINE = "OFFLINE"; + + public DefaultControllerRestApi(List<PushLocation> pushLocations, String rawTableName) { + LOGGER.info("Push locations are: {} for table: {}", pushLocations, rawTableName); + _pushLocations = pushLocations; + _rawTableName = rawTableName; + } + + @Override + public TableConfig getTableConfig() { + for (PushLocation pushLocation : _pushLocations) { + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient + .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName)); + JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE); + if (offlineJsonTableConfig != null) { + TableConfig offlineTableConfig = TableConfig.fromJsonConfig(offlineJsonTableConfig); + LOGGER.info("Got table config: {}", offlineTableConfig); + return offlineTableConfig; + } + } catch (Exception e) { + LOGGER.warn("Caught exception while fetching table config for table: {} from push location: {}", _rawTableName, + pushLocation, e); + } + } + String errorMessage = String + .format("Failed to get table config from push locations: %s for table: %s", _pushLocations, _rawTableName); + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + @Override + public Schema getSchema() { + for (PushLocation pushLocation : _pushLocations) { + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(FileUploadDownloadClient + .getRetrieveSchemaHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName)); + Schema schema = Schema.fromString(response.getResponse()); + LOGGER.info("Got schema: {}", schema); + return schema; + } catch (Exception e) { + LOGGER.warn("Caught exception while fetching schema for table: {} from push location: {}", _rawTableName, + pushLocation, e); + } + } + String errorMessage = + String.format("Failed to get schema from push locations: %s for table: %s", _pushLocations, _rawTableName); + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + @Override + public void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths) { + LOGGER.info("Start pushing segments: {} to locations: {}", tarFilePaths, _pushLocations); + for (Path tarFilePath : tarFilePaths) { + String fileName = tarFilePath.getName(); + Preconditions.checkArgument(fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT)); + String segmentName = fileName.substring(0, fileName.length() - JobConfigConstants.TAR_GZ_FILE_EXT.length()); + for (PushLocation pushLocation : _pushLocations) { + LOGGER.info("Pushing segment: {} to location: {}", segmentName, pushLocation); + try (InputStream inputStream = fileSystem.open(tarFilePath)) { + SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment( + FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), + segmentName, inputStream, _rawTableName); + LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); + } catch (Exception e) { + LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public void sendSegmentUris(List<String> segmentUris) { + LOGGER.info("Start sending segment URIs: {} to locations: {}", segmentUris, _pushLocations); + for (String segmentUri : segmentUris) { + for (PushLocation pushLocation : _pushLocations) { + LOGGER.info("Sending segment URI: {} to location: {}", segmentUri, pushLocation); + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri( + FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), + segmentUri, _rawTableName); + LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); + } catch (Exception e) { + LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public void deleteSegmentUris(List<String> segmentUris) { + LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations); + for (String segmentUri : segmentUris) { + for (PushLocation pushLocation : _pushLocations) { + LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation); + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest( + FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName, + segmentUri, "OFFLINE")); + LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); + } catch (Exception e) { + LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public List<String> getAllSegments(String tableType) { + LOGGER.info("Getting all segments of table {}", _rawTableName); + ObjectMapper objectMapper = new ObjectMapper(); + for (PushLocation pushLocation : _pushLocations) { + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest( + FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(), + _rawTableName, tableType)); + JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType); + return objectMapper.convertValue(segmentList, ArrayList.class); + } catch (Exception e) { + LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName, + pushLocation, e); + } + } + String errorMessage = + String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations, + _rawTableName); + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + + } + + @Override + public void close() + throws IOException { + _fileUploadDownloadClient.close(); + } + + private JsonNode getSegmentsFromJsonSegmentAPI(String json, String tableType) + throws Exception { + return JsonUtils.stringToJsonNode(json).get(0).get(tableType); + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java new file mode 100644 index 0000000..f7636d3 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/JobConfigConstants.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +public class JobConfigConstants { + public static final String PATH_TO_INPUT = "path.to.input"; + public static final String PATH_TO_OUTPUT = "path.to.output"; + public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output"; + public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar"; + public static final String PATH_TO_READER_CONFIG = "path.to.reader.config"; + // Leave this for backward compatibility. We prefer to use the schema fetched from the controller. + public static final String PATH_TO_SCHEMA = "path.to.schema"; + + public static final String SEGMENT_TAR_DIR = "segmentTar"; + public static final String TAR_GZ_FILE_EXT = ".tar.gz"; + + public static final String SEGMENT_TABLE_NAME = "segment.table.name"; + public static final String TABLE_CONFIG = "table.config"; + public static final String SCHEMA = "data.schema"; + + public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type"; + public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple"; + public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate"; + public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR; + + // For SimpleSegmentNameGenerator + public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix"; + + // For NormalizedDateSegmentNameGenerator + public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix"; + public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id"; + + public static final String PUSH_TO_HOSTS = "push.to.hosts"; + public static final String PUSH_TO_PORT = "push.to.port"; + + public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode"; + + // The path to the record reader to be configured + public static final String RECORD_READER_PATH = "record.reader.path"; + + public static final String ENABLE_PREPROCESSING = "enable.preprocessing"; + + // This setting should be used if you will generate less # of segments after + // push. In preprocessing, this is likely because we resize segments. + public static final String DELETE_EXTRA_SEGMENTS = "delete.extra.segments"; + + // This setting is used to match output segments hierarchy along with input file hierarchy. + public static final String USE_RELATIVE_PATH = "use.relative.path"; +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java new file mode 100644 index 0000000..74f09ee --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationJob.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.google.common.base.Preconditions; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.data.TimeFieldSpec; +import org.apache.pinot.common.utils.DataSize; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.core.data.readers.CSVRecordReaderConfig; +import org.apache.pinot.core.data.readers.FileFormat; +import org.apache.pinot.core.data.readers.RecordReaderConfig; +import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.spark.utils.JobPreparationHelper; +import org.apache.pinot.spark.utils.PushLocation; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SegmentCreationJob extends BaseSegmentJob { + protected static final String APPEND = "APPEND"; + protected static final String LOCAL_TEMP_DIR = "pinot_spark_tmp"; + protected static final Logger LOGGER = LoggerFactory.getLogger(SegmentCreationJob.class); + protected final String _rawTableName; + + protected final String _inputPattern; + protected final String _outputDir; + protected final String _stagingDir; + // Optional + protected final String _depsJarDir; + protected final String _schemaFile; + protected final String _defaultPermissionsMask; + protected final List<PushLocation> _pushLocations; + + public SegmentCreationJob(Properties properties) { + super(properties); + new Configuration().set("mapreduce.job.user.classpath.first", "true"); + + _inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT)).toString(); + _outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)).toString(); + _stagingDir = new Path(_outputDir, UUID.randomUUID().toString()).toString(); + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + + // Optional + _depsJarDir = properties.getProperty(JobConfigConstants.PATH_TO_DEPS_JAR); + _schemaFile = properties.getProperty(JobConfigConstants.PATH_TO_SCHEMA); + _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK); + + // Optional push location and table parameters. If set, will use the table config and schema from the push hosts. + String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS); + String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT); + if (pushHostsString != null && pushPortString != null) { + _pushLocations = + PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString)); + } else { + _pushLocations = null; + } + + LOGGER.info("*********************************************************************"); + LOGGER.info("Input Pattern: {}", _inputPattern); + LOGGER.info("Output Directory: {}", _outputDir); + LOGGER.info("Staging Directory: {}", _stagingDir); + LOGGER.info("Raw Table Name: {}", _rawTableName); + LOGGER.info("Dependencies Directory: {}", _depsJarDir); + LOGGER.info("Schema File: {}", _schemaFile); + LOGGER.info("Default Permissions Mask: {}", _defaultPermissionsMask); + LOGGER.info("Push Locations: {}", _pushLocations); + LOGGER.info("*********************************************************************"); + } + + /** + * Generate a relative output directory path when `useRelativePath` flag is on. + * This method will compute the relative path based on `inputFile` and `baseInputDir`, + * then apply only the directory part of relative path to `outputDir`. + * E.g. + * baseInputDir = "/path/to/input" + * inputFile = "/path/to/input/a/b/c/d.avro" + * outputDir = "/path/to/output" + * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c + */ + protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) { + URI relativePath = baseInputDir.relativize(inputFile); + Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), + "Unable to extract out the relative path based on base input path: " + baseInputDir); + return new Path(outputDir, relativePath.getPath()).getParent(); + } + + protected static void createSingleSegment(String inputFile, Long seqId, Configuration conf, String stagingDir) + throws IOException { + Path hdfsInputFile = new Path(inputFile); + int sequenceId = seqId.intValue(); + LOGGER.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId); + + String rawTableName = conf.get(JobConfigConstants.SEGMENT_TABLE_NAME); + Schema schema = Schema.fromString(conf.get(JobConfigConstants.SCHEMA)); + SegmentNameGenerator segmentNameGenerator; + boolean useRelativePath = conf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false); + + // Optional + TableConfig tableConfig = null; + String recordReaderPath; + Path readerConfigFile = null; + + String tableConfigString = conf.get(JobConfigConstants.TABLE_CONFIG); + if (tableConfigString != null) { + tableConfig = TableConfig.fromJsonString(tableConfigString); + } + String readerConfigFileStr = conf.get(JobConfigConstants.PATH_TO_READER_CONFIG); + if (readerConfigFileStr != null) { + readerConfigFile = new Path(readerConfigFileStr); + } + recordReaderPath = conf.get(JobConfigConstants.RECORD_READER_PATH); + + // HDFS segment tar directory + Path hdfsSegmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR); + + // Set up segment name generator + String segmentNameGeneratorType = + conf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR); + switch (segmentNameGeneratorType) { + case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR: + segmentNameGenerator = + new SimpleSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX)); + break; + case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR: + Preconditions.checkState(tableConfig != null, + "In order to use NormalizedDateSegmentNameGenerator, table config must be provided"); + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + String timeFormat = null; + TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec(); + if (timeFieldSpec != null) { + timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat(); + } + segmentNameGenerator = + new NormalizedDateSegmentNameGenerator(rawTableName, conf.get(JobConfigConstants.SEGMENT_NAME_PREFIX), + conf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), validationConfig.getSegmentPushType(), + validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat); + break; + default: + throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType); + } + + // Temporary local directories + File localStagingDir = new File(LOCAL_TEMP_DIR); + File localInputDir = new File(localStagingDir, "inputData"); + File localSegmentsDir = new File(localStagingDir, "segments"); + File localSegmentTarDir = new File(localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR); + + String inputFileName = hdfsInputFile.getName(); + File localInputFile = new File(localInputDir, inputFileName); + LOGGER.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile); + FileSystem.get(hdfsInputFile.toUri(), new Configuration()) + .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath())); + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setTableName(rawTableName); + segmentGeneratorConfig.setInputFilePath(localInputFile.getPath()); + segmentGeneratorConfig.setOutDir(localSegmentsDir.getPath()); + segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator); + segmentGeneratorConfig.setSequenceId(sequenceId); + if (recordReaderPath != null) { + segmentGeneratorConfig.setRecordReaderPath(recordReaderPath); + segmentGeneratorConfig.setFormat(FileFormat.OTHER); + } else { + FileFormat fileFormat = getFileFormat(inputFileName); + segmentGeneratorConfig.setFormat(fileFormat); + segmentGeneratorConfig.setReaderConfig(getReaderConfig(conf, readerConfigFile, fileFormat)); + } + segmentGeneratorConfig.setOnHeap(true); + + addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId); + + LOGGER.info("Start creating segment with sequence id: {}", sequenceId); + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + try { + driver.init(segmentGeneratorConfig); + driver.build(); + } catch (Exception e) { + LOGGER.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, + sequenceId, e); + throw new RuntimeException(e); + } + String segmentName = driver.getSegmentName(); + LOGGER.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId); + + File localSegmentDir = new File(localSegmentsDir, segmentName); + String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT; + File localSegmentTarFile = new File(localSegmentTarDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath()); + + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); + + Path hdfsSegmentTarFile = new Path(hdfsSegmentTarDir, segmentTarFileName); + if (useRelativePath) { + Path relativeOutputPath = + getRelativeOutputPath(new Path(conf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(), + hdfsSegmentTarDir); + hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName); + } + LOGGER.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile); + FileSystem.get(hdfsSegmentTarFile.toUri(), new Configuration()) + .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile); + + LOGGER.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile, + sequenceId); + } + + protected static FileFormat getFileFormat(String fileName) { + if (fileName.endsWith(".avro")) { + return FileFormat.AVRO; + } + if (fileName.endsWith(".csv")) { + return FileFormat.CSV; + } + if (fileName.endsWith(".json")) { + return FileFormat.JSON; + } + if (fileName.endsWith(".thrift")) { + return FileFormat.THRIFT; + } + throw new IllegalArgumentException("Unsupported file format: {}" + fileName); + } + + @Nullable + protected static RecordReaderConfig getReaderConfig(Configuration conf, Path readerConfigFile, FileFormat fileFormat) + throws IOException { + if (readerConfigFile != null) { + if (fileFormat == FileFormat.CSV) { + try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) { + CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class); + LOGGER.info("Using CSV record reader config: {}", readerConfig); + return readerConfig; + } + } + if (fileFormat == FileFormat.THRIFT) { + try (InputStream inputStream = FileSystem.get(readerConfigFile.toUri(), conf).open(readerConfigFile)) { + ThriftRecordReaderConfig readerConfig = + JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class); + LOGGER.info("Using Thrift record reader config: {}", readerConfig); + return readerConfig; + } + } + } + return null; + } + + /** + * Can be overridden to set additional segment generator configs. + */ + @SuppressWarnings("unused") + protected static void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, + Path hdfsInputFile, int sequenceId) { + } + + @Override + protected boolean isDataFile(String fileName) { + // For custom record reader, treat all files as data file + if (_properties.getProperty(JobConfigConstants.RECORD_READER_PATH) != null) { + return true; + } + return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName + .endsWith(".thrift"); + } + + public void run() + throws Exception { + LOGGER.info("Starting {}", getClass().getSimpleName()); + + Path inputPattern = new Path(_inputPattern); + Path outputDir = new Path(_stagingDir); + Path stagingDir = new Path(_stagingDir); + + // Initialize all directories + FileSystem outputDirFileSystem = FileSystem.get(outputDir.toUri(), new Configuration()); + JobPreparationHelper.mkdirs(outputDirFileSystem, outputDir, _defaultPermissionsMask); + JobPreparationHelper.mkdirs(outputDirFileSystem, stagingDir, _defaultPermissionsMask); + Path stagingInputDir = new Path(stagingDir, "input"); + JobPreparationHelper.mkdirs(outputDirFileSystem, stagingInputDir, _defaultPermissionsMask); + + // Gather all data files + List<Path> dataFilePaths = getDataFilePaths(inputPattern); + int numDataFiles = dataFilePaths.size(); + if (numDataFiles == 0) { + String errorMessage = "No data file founded with pattern: " + inputPattern; + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + } else { + LOGGER.info("Creating segments with data files: {}", dataFilePaths); + for (int i = 0; i < numDataFiles; i++) { + Path dataFilePath = dataFilePaths.get(i); + try (DataOutputStream dataOutputStream = outputDirFileSystem + .create(new Path(stagingInputDir, Integer.toString(i)))) { + dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i)); + dataOutputStream.flush(); + } + } + } + + // Set up the job + List<String> dataFilePathStrs = new ArrayList<>(); + for (Path dataFilePath : dataFilePaths) { + dataFilePathStrs.add(dataFilePath.toString()); + } + + // Set table config and schema + TableConfig tableConfig = getTableConfig(); + if (tableConfig != null) { + validateTableConfig(tableConfig); + _properties.put(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString()); + } + _properties.put(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString()); + + JavaSparkContext sparkContext = new JavaSparkContext(); + addDepsJarToDistributedCache(sparkContext); + JavaRDD<String> pathRDD = sparkContext.parallelize(dataFilePathStrs, numDataFiles); + pathRDD.zipWithIndex().foreach(tuple2 -> { + SegmentCreationMapper segmentCreationMapper = + new SegmentCreationMapper(_properties, new Path(_stagingDir, "output").toString()); + segmentCreationMapper.run(tuple2._1, tuple2._2); + segmentCreationMapper.cleanup(); + }); + + moveSegmentsToOutputDir(outputDirFileSystem, _stagingDir, _outputDir); + + // Delete the staging directory + LOGGER.info("Deleting the staging directory: {}", stagingDir); + outputDirFileSystem.delete(stagingDir, true); + } + + @Override + protected Schema getSchema() + throws IOException { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + if (controllerRestApi != null) { + return controllerRestApi.getSchema(); + } else { + // Schema file could be stored local or remotely. + Path schemaFilePath = new Path(_schemaFile); + try (InputStream inputStream = FileSystem.get(schemaFilePath.toUri(), new Configuration()) + .open(schemaFilePath)) { + return Schema.fromInputSteam(inputStream); + } + } + } + } + + protected void validateTableConfig(TableConfig tableConfig) { + SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); + + // For APPEND use case, timeColumnName and timeType must be set + if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) { + Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null, + "For APPEND use case, time column and type must be set"); + } + } + + protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext) + throws IOException { + if (_depsJarDir != null) { + Path depsJarPath = new Path(_depsJarDir); + JobPreparationHelper + .addDepsJarToDistributedCacheHelper(FileSystem.get(depsJarPath.toUri(), new Configuration()), sparkContext, + depsJarPath); + } + } + + protected void moveSegmentsToOutputDir(FileSystem outputDirFileSystem, String stagingDir, String outputDir) + throws IOException { + Path segmentTarDir = new Path(new Path(stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR); + for (FileStatus segmentTarStatus : outputDirFileSystem.listStatus(segmentTarDir)) { + Path segmentTarPath = segmentTarStatus.getPath(); + Path dest = new Path(outputDir, segmentTarPath.getName()); + LOGGER.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest); + outputDirFileSystem.rename(segmentTarPath, dest); + } + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java new file mode 100644 index 0000000..f9e771b --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentCreationMapper.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.data.TimeFieldSpec; +import org.apache.pinot.common.utils.DataSize; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.core.data.readers.CSVRecordReaderConfig; +import org.apache.pinot.core.data.readers.FileFormat; +import org.apache.pinot.core.data.readers.RecordReaderConfig; +import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SegmentCreationMapper implements Serializable { + protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp"; + + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + + protected Configuration _jobConf; + protected String _rawTableName; + protected Schema _schema; + protected SegmentNameGenerator _segmentNameGenerator; + protected boolean _useRelativePath; + + // Optional + protected TableConfig _tableConfig; + protected String _recordReaderPath; + protected Path _readerConfigFile; + + // HDFS segment tar directory + protected Path _hdfsSegmentTarDir; + + // Temporary local directories + protected File _localStagingDir; + protected File _localInputDir; + protected File _localSegmentDir; + protected File _localSegmentTarDir; + + public SegmentCreationMapper(Properties properties, String workerOutputPath) + throws IOException { + _jobConf = new Configuration(); + for (Map.Entry<Object, Object> entry : properties.entrySet()) { + _jobConf.set(entry.getKey().toString(), entry.getValue().toString()); + } + + logConfigurations(); + + _useRelativePath = _jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false); + _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME); + _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA)); + + // Optional + String tableConfigString = _jobConf.get(JobConfigConstants.TABLE_CONFIG); + if (tableConfigString != null) { + _tableConfig = TableConfig.fromJsonString(tableConfigString); + } + String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG); + if (readerConfigFile != null) { + _readerConfigFile = new Path(readerConfigFile); + } + _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH); + + // Set up segment name generator + String segmentNameGeneratorType = + _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR); + switch (segmentNameGeneratorType) { + case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR: + _segmentNameGenerator = + new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX)); + break; + case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR: + Preconditions.checkState(_tableConfig != null, + "In order to use NormalizedDateSegmentNameGenerator, table config must be provided"); + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + String timeFormat = null; + TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec(); + if (timeFieldSpec != null) { + timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat(); + } + _segmentNameGenerator = + new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX), + _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), + validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(), + validationConfig.getTimeType(), timeFormat); + break; + default: + throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType); + } + + // Working directories + _hdfsSegmentTarDir = new Path(workerOutputPath, JobConfigConstants.SEGMENT_TAR_DIR); + _localStagingDir = new File(LOCAL_TEMP_DIR); + _localInputDir = new File(_localStagingDir, "inputData"); + _localSegmentDir = new File(_localStagingDir, "segments"); + _localSegmentTarDir = new File(_localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR); + + if (_localStagingDir.exists()) { + _logger.warn("Deleting existing file: {}", _localStagingDir); + FileUtils.forceDelete(_localStagingDir); + } + _logger + .info("Making local temporary directories: {}, {}, {}", _localStagingDir, _localInputDir, _localSegmentTarDir); + Preconditions.checkState(_localStagingDir.mkdirs()); + Preconditions.checkState(_localInputDir.mkdir()); + Preconditions.checkState(_localSegmentDir.mkdir()); + Preconditions.checkState(_localSegmentTarDir.mkdir()); + + _logger.info("*********************************************************************"); + _logger.info("Raw Table Name: {}", _rawTableName); + _logger.info("Schema: {}", _schema); + _logger.info("Segment Name Generator: {}", _segmentNameGenerator); + _logger.info("Table Config: {}", _tableConfig); + _logger.info("Reader Config File: {}", _readerConfigFile); + _logger.info("*********************************************************************"); + _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir); + _logger.info("Local Staging Directory: {}", _localStagingDir); + _logger.info("Local Input Directory: {}", _localInputDir); + _logger.info("Local Segment Tar Directory: {}", _localSegmentTarDir); + _logger.info("*********************************************************************"); + } + + /** + * Generate a relative output directory path when `useRelativePath` flag is on. + * This method will compute the relative path based on `inputFile` and `baseInputDir`, + * then apply only the directory part of relative path to `outputDir`. + * E.g. + * baseInputDir = "/path/to/input" + * inputFile = "/path/to/input/a/b/c/d.avro" + * outputDir = "/path/to/output" + * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c + */ + protected static Path getRelativeOutputPath(URI baseInputDir, URI inputFile, Path outputDir) { + URI relativePath = baseInputDir.relativize(inputFile); + Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), + "Unable to extract out the relative path based on base input path: " + baseInputDir); + return new Path(outputDir, relativePath.getPath()).getParent(); + } + + protected void logConfigurations() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append('{'); + boolean firstEntry = true; + for (Map.Entry<String, String> entry : _jobConf) { + if (!firstEntry) { + stringBuilder.append(", "); + } else { + firstEntry = false; + } + + stringBuilder.append(entry.getKey()); + stringBuilder.append('='); + stringBuilder.append(entry.getValue()); + } + stringBuilder.append('}'); + + _logger.info("*********************************************************************"); + _logger.info("Job Configurations: {}", stringBuilder.toString()); + _logger.info("*********************************************************************"); + } + + protected void run(String hdfsInputFileString, Long seqId) + throws IOException, InterruptedException { + Path hdfsInputFile = new Path(hdfsInputFileString); + int sequenceId = seqId.intValue(); + _logger.info("Generating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, sequenceId); + + String inputFileName = hdfsInputFile.getName(); + File localInputFile = new File(_localInputDir, inputFileName); + _logger.info("Copying input file from: {} to: {}", hdfsInputFile, localInputFile); + FileSystem.get(hdfsInputFile.toUri(), _jobConf) + .copyToLocalFile(hdfsInputFile, new Path(localInputFile.getAbsolutePath())); + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema); + segmentGeneratorConfig.setTableName(_rawTableName); + segmentGeneratorConfig.setInputFilePath(localInputFile.getPath()); + segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath()); + segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator); + segmentGeneratorConfig.setSequenceId(sequenceId); + if (_recordReaderPath != null) { + segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath); + segmentGeneratorConfig.setFormat(FileFormat.OTHER); + } else { + FileFormat fileFormat = getFileFormat(inputFileName); + segmentGeneratorConfig.setFormat(fileFormat); + segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat)); + } + segmentGeneratorConfig.setOnHeap(true); + + addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, hdfsInputFile, sequenceId); + + _logger.info("Start creating segment with sequence id: {}", sequenceId); + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + + try { + driver.init(segmentGeneratorConfig); + driver.build(); + } catch (Exception e) { + _logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile, + sequenceId, e); + throw new RuntimeException(e); + } + String segmentName = driver.getSegmentName(); + _logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId); + + File localSegmentDir = new File(_localSegmentDir, segmentName); + String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT; + File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName); + _logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), localSegmentTarFile.getPath()); + + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + _logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSize.fromBytes(uncompressedSegmentSize), DataSize.fromBytes(compressedSegmentSize)); + + Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName); + if (_useRelativePath) { + Path relativeOutputPath = + getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(), + _hdfsSegmentTarDir); + hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName); + } + _logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile); + FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf) + .copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile); + + _logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile, + sequenceId); + } + + protected FileFormat getFileFormat(String fileName) { + if (fileName.endsWith(".avro")) { + return FileFormat.AVRO; + } + if (fileName.endsWith(".csv")) { + return FileFormat.CSV; + } + if (fileName.endsWith(".json")) { + return FileFormat.JSON; + } + if (fileName.endsWith(".thrift")) { + return FileFormat.THRIFT; + } + throw new IllegalArgumentException("Unsupported file format: {}" + fileName); + } + + @Nullable + protected RecordReaderConfig getReaderConfig(FileFormat fileFormat) + throws IOException { + if (_readerConfigFile != null) { + if (fileFormat == FileFormat.CSV) { + try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) { + CSVRecordReaderConfig readerConfig = JsonUtils.inputStreamToObject(inputStream, CSVRecordReaderConfig.class); + _logger.info("Using CSV record reader config: {}", readerConfig); + return readerConfig; + } + } + if (fileFormat == FileFormat.THRIFT) { + try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) { + ThriftRecordReaderConfig readerConfig = + JsonUtils.inputStreamToObject(inputStream, ThriftRecordReaderConfig.class); + _logger.info("Using Thrift record reader config: {}", readerConfig); + return readerConfig; + } + } + } + return null; + } + + /** + * Can be overridden to set additional segment generator configs. + */ + @SuppressWarnings("unused") + protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path hdfsInputFile, + int sequenceId) { + } + + public void cleanup() { + _logger.info("Deleting local temporary directory: {}", _localStagingDir); + FileUtils.deleteQuietly(_localStagingDir); + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java new file mode 100644 index 0000000..259e95b --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentTarPushJob.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.spark.utils.PushLocation; + + +public class SegmentTarPushJob extends BaseSegmentJob { + private final Path _segmentPattern; + private final List<PushLocation> _pushLocations; + private final String _rawTableName; + private final boolean _deleteExtraSegments; + + public SegmentTarPushJob(Properties properties) { + super(properties); + _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)); + String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ','); + int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); + _pushLocations = PushLocation.getPushLocations(hosts, port); + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_SEGMENTS, "false")); + } + + @Override + protected boolean isDataFile(String fileName) { + return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT); + } + + public void run() + throws Exception { + FileSystem fileSystem = FileSystem.get(_segmentPattern.toUri(), new Configuration()); + List<Path> segmentsToPush = getDataFilePaths(_segmentPattern); + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + // TODO: Deal with invalid prefixes in the future + + List<String> currentSegments = controllerRestApi.getAllSegments("OFFLINE"); + + controllerRestApi.pushSegments(fileSystem, segmentsToPush); + + if (_deleteExtraSegments) { + controllerRestApi.deleteSegmentUris(getSegmentsToDelete(currentSegments, segmentsToPush)); + } + } + } + + /** + * Deletes extra segments after pushing to the controller + * @param allSegments all segments on the controller for the table + * @param segmentsToPush segments that will be pushed to the controller + * @throws IOException + */ + public List<String> getSegmentsToDelete(List<String> allSegments, List<Path> segmentsToPush) { + Set<String> uniqueSegmentPrefixes = new HashSet<>(); + + // Get all relevant segment prefixes that we are planning on pushing + List<String> segmentNamesToPush = segmentsToPush.stream().map(s -> s.getName()).collect(Collectors.toList()); + for (String segmentName : segmentNamesToPush) { + String segmentNamePrefix = removeSequenceId(segmentName); + uniqueSegmentPrefixes.add(segmentNamePrefix); + } + + List<String> relevantSegments = new ArrayList<>(); + // Get relevant segments already pushed that we are planning on refreshing + for (String segmentName : allSegments) { + if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) { + relevantSegments.add(segmentName); + } + } + + relevantSegments.removeAll(segmentNamesToPush); + return relevantSegments; + } + + /** + * Remove trailing sequence id + * eg: If segment name is mytable_12, it will return mytable_ + * If segment name is mytable_20190809_20190809_12, it will return mytable_20190809_20190809_ + * @param segmentName + * @return + */ + private String removeSequenceId(String segmentName) { + return segmentName.replaceAll("\\d*$", ""); + } + + protected ControllerRestApi getControllerRestApi() { + return new DefaultControllerRestApi(_pushLocations, _rawTableName); + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java new file mode 100644 index 0000000..910fae3 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SegmentUriPushJob.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.jobs; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.spark.utils.PushLocation; + + +public class SegmentUriPushJob extends BaseSegmentJob { + private final String _segmentUriPrefix; + private final String _segmentUriSuffix; + private final Path _segmentPattern; + private final List<PushLocation> _pushLocations; + private final String _rawTableName; + + public SegmentUriPushJob(Properties properties) { + super(properties); + _segmentUriPrefix = properties.getProperty("uri.prefix", ""); + _segmentUriSuffix = properties.getProperty("uri.suffix", ""); + _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)); + String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ','); + int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); + _pushLocations = PushLocation.getPushLocations(hosts, port); + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); + } + + @Override + protected boolean isDataFile(String fileName) { + return fileName.endsWith(JobConfigConstants.TAR_GZ_FILE_EXT); + } + + public void run() + throws Exception { + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + List<Path> tarFilePaths = getDataFilePaths(_segmentPattern); + List<String> segmentUris = new ArrayList<>(tarFilePaths.size()); + for (Path tarFilePath : tarFilePaths) { + segmentUris.add(_segmentUriPrefix + tarFilePath.toUri().getRawPath() + _segmentUriSuffix); + } + controllerRestApi.sendSegmentUris(segmentUris); + } + } + + protected ControllerRestApi getControllerRestApi() { + return new DefaultControllerRestApi(_pushLocations, _rawTableName); + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java new file mode 100644 index 0000000..92bc990 --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/JobPreparationHelper.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.utils; + +import java.io.IOException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JobPreparationHelper { + private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class); + + public static void mkdirs(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask) + throws IOException { + if (fileSystem.exists(dirPath)) { + _logger.warn("Deleting existing file: {}", dirPath); + fileSystem.delete(dirPath, true); + } + _logger.info("Making directory: {}", dirPath); + fileSystem.mkdirs(dirPath); + JobPreparationHelper.setDirPermission(fileSystem, dirPath, defaultPermissionsMask); + } + + public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, JavaSparkContext sparkContext, + Path depsJarDir) + throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + addDepsJarToDistributedCacheHelper(fileSystem, sparkContext, fileStatus.getPath()); + } else { + Path depJarPath = fileStatus.getPath(); + if (depJarPath.getName().endsWith(".jar")) { + _logger.info("Adding deps jar: {} to distributed cache", depJarPath); + sparkContext.addJar(depJarPath.toUri().getPath()); + } + } + } + } + + public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask) + throws IOException { + if (defaultPermissionsMask != null) { + FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask)); + _logger.info("Setting permission: {} to directory: {}", permission, dirPath); + fileSystem.setPermission(dirPath, permission); + } + } +} diff --git a/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java new file mode 100644 index 0000000..8793a1a --- /dev/null +++ b/pinot-spark/src/main/java/org/apache/pinot/spark/utils/PushLocation.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spark.utils; + +import java.util.ArrayList; +import java.util.List; + + +public class PushLocation { + private final String _host; + private final int _port; + + public PushLocation(String host, int port) { + _host = host; + _port = port; + } + + public static List<PushLocation> getPushLocations(String[] hosts, int port) { + List<PushLocation> pushLocations = new ArrayList<>(hosts.length); + for (String host : hosts) { + pushLocations.add(new PushLocation(host, port)); + } + return pushLocations; + } + + public String getHost() { + return _host; + } + + public int getPort() { + return _port; + } + + @Override + public String toString() { + return _host + ":" + _port; + } +} diff --git a/pom.xml b/pom.xml index e6dc7f1..6ceee81 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ <hadoop.version>2.7.0</hadoop.version> <spark.version>2.2.0</spark.version> <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.11</scala.version> <antlr.version>4.6</antlr.version> <calcite.version>1.19.0</calcite.version> <!-- commons-configuration, hadoop-common, hadoop-client use commons-lang --> @@ -597,6 +598,16 @@ <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> <!-- Hadoop --> <dependency> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
