This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f7c9ddb50d9e24cd0f8487162833c478f37a6329 Author: Xiang Fu <[email protected]> AuthorDate: Wed Dec 9 00:33:26 2020 -0800 Adding pinot minion segment creation task --- .../SegmentGenerationAndPushTaskGenerator.java | 241 +++++++++++++++++++++ .../minion/generator/TaskGeneratorRegistry.java | 1 + .../apache/pinot/core/common/MinionConstants.java | 6 + pinot-minion/pom.xml | 5 + .../executor/SegmentGenerationAndPushResult.java | 91 ++++++++ .../SegmentGenerationAndPushTaskExecutor.java | 187 ++++++++++++++++ ...egmentGenerationAndPushTaskExecutorFactory.java | 8 + .../executor/TaskExecutorFactoryRegistry.java | 1 + .../pinot/tools/BatchQuickstartWithMinion.java | 35 +++ .../org/apache/pinot/tools/BootstrapTableTool.java | 136 ++++++++++-- .../java/org/apache/pinot/tools/Quickstart.java | 17 +- .../tools/admin/command/QuickstartRunner.java | 21 ++ .../tools/admin/command/StartMinionCommand.java | 20 ++ 13 files changed, 743 insertions(+), 26 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java new file mode 100644 index 0000000..0d05472 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion.generator; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class); + + private final ClusterInfoAccessor _clusterInfoAccessor; + + public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) { + _clusterInfoAccessor = clusterInfoAccessor; + } + + @Override + public String getTaskType() { + return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + + for (TableConfig tableConfig : tableConfigs) { + // Only generate tasks for OFFLINE tables + String offlineTableName = tableConfig.getTableName(); + if (tableConfig.getTableType() != TableType.OFFLINE) { + LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName); + continue; + } + + TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); + Preconditions.checkNotNull(tableTaskConfig); + Map<String, String> taskConfigs = + tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE); + Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName); + + // Get max number of tasks for this table + int tableMaxNumTasks; + String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); + if (tableMaxNumTasksConfig != null) { + try { + tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); + } catch (Exception e) { + tableMaxNumTasks = Integer.MAX_VALUE; + } + } else { + tableMaxNumTasks = Integer.MAX_VALUE; + } + + // Generate tasks + int tableNumTasks = 0; + // Generate up to tableMaxNumTasks tasks each time for each table + if (tableNumTasks == tableMaxNumTasks) { + break; + } + String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig); + String batchSegmentIngestionFrequency = IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig); + BatchIngestionConfig batchIngestionConfig = tableConfig.getIngestionConfig().getBatchIngestionConfig(); + List<Map<String, String>> batchConfigMaps = batchIngestionConfig.getBatchConfigMaps(); + for (Map<String, String> batchConfigMap : batchConfigMaps) { + try { + URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI)); + URI outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI)); + + List<OfflineSegmentZKMetadata> offlineSegmentsMetadata = Collections.emptyList(); + // For append mode, we don't create segments for input file URIs already created. + if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) { + offlineSegmentsMetadata = this._clusterInfoAccessor.getOfflineSegmentsMetadata(offlineTableName); + } + List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, + getExistingSegmentInputFiles(offlineSegmentsMetadata)); + + String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap); + for (URI inputFileURI : inputFileURIs) { + Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap); + singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_FILE_URI, inputFileURI.toString()); + URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI); + singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString()); + singleFileGenerationTaskConfig + .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName))); + singleFileGenerationTaskConfig + .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName))); + singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(tableNumTasks)); + singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE); + singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode); + singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _clusterInfoAccessor.getVipUrl()); + // Only submit raw data files with timestamp larger than checkpoint + pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, + singleFileGenerationTaskConfig)); + tableNumTasks++; + + // Generate up to tableMaxNumTasks tasks each time for each table + if (tableNumTasks == tableMaxNumTasks) { + break; + } + } + } catch (Exception e) { + LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]", + tableConfig, taskConfigs, e); + } + } + } + return pinotTaskConfigs; + } + + private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI, + Set<String> existingSegmentInputFileURIs) { + String inputDirURIScheme = inputDirURI.getScheme(); + if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) { + String fsClass = batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS); + PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(batchConfigMap); + PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps); + } + PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme); + + String includeFileNamePattern = batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN); + String excludeFileNamePattern = batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN); + + //Get list of files to process + String[] files; + try { + files = inputDirFS.listFiles(inputDirURI, true); + } catch (IOException e) { + LOGGER.error("Unable to list files under URI: " + inputDirURI, e); + return Collections.emptyList(); + } + PathMatcher includeFilePathMatcher = null; + if (includeFileNamePattern != null) { + includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includeFileNamePattern); + } + PathMatcher excludeFilePathMatcher = null; + if (excludeFileNamePattern != null) { + excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludeFileNamePattern); + } + List<URI> inputFileURIs = new ArrayList<>(); + for (String file : files) { + if (includeFilePathMatcher != null) { + if (!includeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + if (excludeFilePathMatcher != null) { + if (excludeFilePathMatcher.matches(Paths.get(file))) { + continue; + } + } + try { + URI inputFileURI = new URI(file); + if (inputFileURI.getScheme() == null) { + inputFileURI = new File(file).toURI(); + } + if (inputDirFS.isDirectory(inputFileURI) || existingSegmentInputFileURIs.contains(inputFileURI.toString())) { + continue; + } + inputFileURIs.add(inputFileURI); + } catch (Exception e) { + continue; + } + } + return inputFileURIs; + } + + private Set<String> getExistingSegmentInputFiles(List<OfflineSegmentZKMetadata> offlineSegmentsMetadata) { + Set<String> existingSegmentInputFiles = new HashSet<>(); + for (OfflineSegmentZKMetadata metadata : offlineSegmentsMetadata) { + if ((metadata.getCustomMap() != null) && metadata.getCustomMap() + .containsKey(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)) { + existingSegmentInputFiles.add(metadata.getCustomMap().get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)); + } + } + return existingSegmentInputFiles; + } + + private URI getDirectoryUri(String uriStr) + throws URISyntaxException { + URI uri = new URI(uriStr); + if (uri.getScheme() == null) { + uri = new File(uriStr).toURI(); + } + return uri; + } + + public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { + URI relativePath = baseInputDir.relativize(inputFile); + Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), + "Unable to extract out the relative path based on base input path: " + baseInputDir); + String outputDirStr = outputDir.toString(); + outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; + URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); + return relativeOutputURI; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java index f112d8b..21070d3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java @@ -36,6 +36,7 @@ public class TaskGeneratorRegistry { public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) { registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor)); registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor)); + registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor)); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index cd98833..546a9fb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -88,4 +88,10 @@ public class MinionConstants { public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType"; public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment"; } + + // Generate segment and push to controller based on batch ingestion configs + public static class SegmentGenerationAndPushTask { + public static final String TASK_TYPE = "SegmentGenerationAndPushTask"; + } + } diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml index cc3608f..f9a442c 100644 --- a/pinot-minion/pom.xml +++ b/pinot-minion/pom.xml @@ -81,6 +81,11 @@ <artifactId>pinot-core</artifactId> </dependency> <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-batch-ingestion-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.helix</groupId> <artifactId>helix-core</artifactId> <exclusions> diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java new file mode 100644 index 0000000..d1cabcb --- /dev/null +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.minion.executor; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.core.minion.PinotTaskConfig; + + +/** + * The class <code>SegmentGenerationAndPushResult</code> wraps the segment generation and push + * results. + */ +public class SegmentGenerationAndPushResult { + private final boolean _succeed; + private final String _segmentName; + private final Exception _exception; + private final Map<String, Object> _customProperties; + + private SegmentGenerationAndPushResult(boolean succeed, String segmentName, Exception exception, + Map<String, Object> customProperties) { + _succeed = succeed; + _segmentName = segmentName; + _exception = exception; + _customProperties = customProperties; + } + + @SuppressWarnings("unchecked") + public <T> T getCustomProperty(String key) { + return (T) _customProperties.get(key); + } + + public Exception getException() { + return _exception; + } + + public boolean isSucceed() { + return _succeed; + } + + public String getSegmentName() { + return _segmentName; + } + + public static class Builder { + private boolean _succeed; + private String _segmentName; + private Exception _exception; + private final Map<String, Object> _customProperties = new HashMap<>(); + + public Builder setSucceed(boolean succeed) { + _succeed = succeed; + return this; + } + + public void setSegmentName(String segmentName) { + _segmentName = segmentName; + } + + public Builder setException(Exception exception) { + _exception = exception; + return this; + } + + public Builder setCustomProperty(String key, Object property) { + _customProperties.put(key, property); + return this; + } + + public SegmentGenerationAndPushResult build() { + return new SegmentGenerationAndPushResult(_succeed, _segmentName, _exception, _customProperties); + } + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java new file mode 100644 index 0000000..0edb584 --- /dev/null +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java @@ -0,0 +1,187 @@ +package org.apache.pinot.minion.executor; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.LocalPinotFS; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.ingestion.batch.spec.Constants; +import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec; +import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; +import org.apache.pinot.spi.utils.DataSizeUtils; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.apache.pinot.spi.utils.retry.RetriableOperationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class); + + private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS(); + + @Override + public Object executeTask(PinotTaskConfig pinotTaskConfig) + throws Exception { + Map<String, String> taskConfigs = pinotTaskConfig.getConfigs(); + SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder(); + File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID()); + try { + // Generate Pinot Segment + SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec(); + URI inputFileURI = URI.create(taskConfigs.get(BatchConfigProperties.INPUT_FILE_URI)); + File localInputTempDir = new File(localTempDir, "input"); + FileUtils.forceMkdir(localInputTempDir); + File localOutputTempDir = new File(localTempDir, "output"); + FileUtils.forceMkdir(localOutputTempDir); + String inputFileURIScheme = inputFileURI.getScheme(); + if (inputFileURIScheme == null) { + inputFileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + if (!PinotFSFactory.isSchemeSupported(inputFileURIScheme)) { + String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS); + PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(taskConfigs); + PinotFSFactory.register(inputFileURIScheme, fsClass, fsProps); + } + PinotFS inputFileFS = PinotFSFactory.create(inputFileURIScheme); + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + String outputURIScheme = outputSegmentDirURI.getScheme(); + if (outputURIScheme == null) { + outputURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME; + } + PinotFS outputFileFS = PinotFSFactory.create(outputURIScheme); + //copy input path to local + File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName()); + inputFileFS.copyToLocalFile(inputFileURI, localInputDataFile); + taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); + taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); + + RecordReaderSpec recordReaderSpec = new RecordReaderSpec(); + recordReaderSpec.setDataFormat(taskConfigs.get(BatchConfigProperties.INPUT_FORMAT)); + recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS)); + recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS)); + taskSpec.setRecordReaderSpec(recordReaderSpec); + Schema schema; + if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) { + schema = JsonUtils + .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class); + } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) { + schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI)); + } else { + throw new RuntimeException( + "Missing schema for segment generation job: please set `schema` or `schemaURI` in task config."); + } + taskSpec.setSchema(schema); + JsonNode tableConfig = JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS)); + taskSpec.setTableConfig(tableConfig); + taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID))); + SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec(); + segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE)); + segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils + .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_CONFIGS)); + taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec); + taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); + SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); + String segmentName = taskRunner.run(); + // Tar segment directory to compress file + File localSegmentDir = new File(localOutputTempDir, segmentName); + String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT; + File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName); + LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile); + TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile); + long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir); + long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); + LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, + DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); + //move segment to output PinotFS + URI outputSegmentTarURI = URI.create(outputSegmentDirURI + segmentTarFileName); + if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS + .exists(outputSegmentDirURI)) { + LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI)); + } else { + outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + } + resultBuilder.setSegmentName(segmentName); + // Segment push task + //Get list of files to process + String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE); + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setPushAttempts(5); + pushJobSpec.setPushParallelism(1); + pushJobSpec.setPushRetryIntervalMillis(1000); + pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); + pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec(); + spec.setPushJobSpec(pushJobSpec); + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(tableConfig.get(BatchConfigProperties.TABLE_NAME).asText()); + spec.setTableSpec(tableSpec); + PinotClusterSpec pinotClusterSpec = new PinotClusterSpec(); + pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI)); + PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec}; + spec.setPinotClusterSpecs(pinotClusterSpecs); + switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { + case TAR: + try { + SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS, Arrays.asList(outputSegmentTarURI.toString())); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case URI: + try { + List<String> segmentUris = new ArrayList<>(); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputSegmentDirURI, outputSegmentTarURI, pushJobSpec.getSegmentUriPrefix(), + pushJobSpec.getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); + SegmentPushUtils.sendSegmentUris(spec, segmentUris); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + case METADATA: + try { + Map<String, String> segmentUriToTarPathMap = SegmentPushUtils + .getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec.getSegmentUriPrefix(), + pushJobSpec.getSegmentUriSuffix(), new String[]{outputSegmentTarURI.toString()}); + SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap); + } catch (RetriableOperationException | AttemptsExceededException e) { + throw new RuntimeException(e); + } + break; + default: + throw new Exception("Unrecognized push mode - " + pushMode); + } + resultBuilder.setSucceed(true); + } catch (Exception e) { + resultBuilder.setException(e); + } finally { + // Cleanup output dir + FileUtils.deleteQuietly(localTempDir); + } + return resultBuilder.build(); + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java new file mode 100644 index 0000000..e4b6447 --- /dev/null +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java @@ -0,0 +1,8 @@ +package org.apache.pinot.minion.executor; + +public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory { + @Override + public PinotTaskExecutor create() { + return new SegmentGenerationAndPushTaskExecutor(); + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java index 1b783dc..a86a39b 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java @@ -38,6 +38,7 @@ public class TaskExecutorFactoryRegistry { registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory()); registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager)); + registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory()); } /** diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java new file mode 100644 index 0000000..9dc11b6 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import org.apache.pinot.spi.plugin.PluginManager; + + +public class BatchQuickstartWithMinion extends Quickstart { + + public String getBootstrapDataDir() { + return "examples/minions/batch/baseballStats"; + } + + public static void main(String[] args) + throws Exception { + PluginManager.get().init(); + new BatchQuickstartWithMinion().execute(); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java index 0723b06..fa70e8a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java @@ -21,12 +21,24 @@ package org.apache.pinot.tools; import com.google.common.base.Preconditions; import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.FileReader; import java.io.Reader; +import java.net.URI; import java.net.URL; +import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.minion.MinionClient; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfig; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.tools.admin.command.AddTableCommand; import org.apache.pinot.tools.utils.JarUtils; import org.slf4j.Logger; @@ -36,9 +48,11 @@ import org.yaml.snakeyaml.Yaml; public class BootstrapTableTool { private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class); + private static final String COMPLETED = "COMPLETED"; private final String _controllerHost; private final int _controllerPort; private final String _tableDir; + private final MinionClient _minionClient; public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) { Preconditions.checkNotNull(controllerHost); @@ -46,11 +60,13 @@ public class BootstrapTableTool { _controllerHost = controllerHost; _controllerPort = controllerPort; _tableDir = tableDir; + _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort)); } public boolean execute() throws Exception { File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis())); + setupTableTmpDir.mkdirs(); File tableDir = new File(_tableDir); String tableName = tableDir.getName(); @@ -95,41 +111,121 @@ public class BootstrapTableTool { .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost) .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute(); } + private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile, File offlineTableConfigFile, File ingestionJobSpecFile) throws Exception { - LOGGER.info("Adding offline table: {}", tableName); - boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile); + TableConfig tableConfig = + JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class); + if (tableConfig.getIngestionConfig() != null + && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) { + updatedTableConfig(tableConfig, tableName, setupTableTmpDir); + } + LOGGER.info("Adding offline table: {}", tableName); + File updatedTableConfigFile = + new File(setupTableTmpDir, String.format("%s_%d.config", tableName, System.currentTimeMillis())); + FileOutputStream outputStream = new FileOutputStream(updatedTableConfigFile); + outputStream.write(JsonUtils.objectToPrettyString(tableConfig).getBytes()); + outputStream.close(); + boolean tableCreationResult = createTable(schemaFile, updatedTableConfigFile); if (!tableCreationResult) { throw new RuntimeException(String .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName, schemaFile, offlineTableConfigFile)); } + if (tableConfig.getTaskConfig() != null) { + _minionClient.scheduleMinionTasks(); + waitForMinionTaskToFinish(30_000L); + } + if (ingestionJobSpecFile != null) { + if (ingestionJobSpecFile.exists()) { + LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]", + tableName, _controllerHost, _controllerPort); + try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) { + SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); + String inputDirURI = spec.getInputDirURI(); + if (!new File(inputDirURI).exists()) { + URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI); + if (resolvedInputDirURI.getProtocol().equals("jar")) { + String[] splits = resolvedInputDirURI.getFile().split("!"); + String inputDir = new File(setupTableTmpDir, "inputData").toString(); + JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir); + spec.setInputDirURI(inputDir); + } else { + spec.setInputDirURI(resolvedInputDirURI.toString()); + } + } + IngestionJobLauncher.runIngestionJob(spec); + } + } else { + LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion", + ingestionJobSpecFile.getAbsolutePath()); + } + } + return true; + } - if (ingestionJobSpecFile.exists()) { - LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]", - tableName, _controllerHost, _controllerPort); - try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) { - SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); - String inputDirURI = spec.getInputDirURI(); - if (!new File(inputDirURI).exists()) { - URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI); - if (resolvedInputDirURI.getProtocol().equals("jar")) { + private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir) + throws Exception { + final List<Map<String, String>> batchConfigsMaps = + tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps(); + for (Map<String, String> batchConfigsMap : batchConfigsMaps) { + BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap); + String inputDirURI = batchConfig.getInputDirURI(); + if (!new File(inputDirURI).exists()) { + URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI); + if (resolvedInputDirURI != null) { + if ("jar".equals(resolvedInputDirURI.getProtocol())) { String[] splits = resolvedInputDirURI.getFile().split("!"); - String inputDir = new File(setupTableTmpDir, "inputData").toString(); - JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir); - spec.setInputDirURI(inputDir); + File inputDir = new File(setupTableTmpDir, "inputData"); + JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir.toString()); + batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.toURI().toString()); + batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI, + new File(inputDir.getParent(), "segments").toURI().toString()); } else { - spec.setInputDirURI(resolvedInputDirURI.toString()); + final URI inputURI = resolvedInputDirURI.toURI(); + batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputURI.toString()); + URI outputURI = + inputURI.getPath().endsWith("/") ? inputURI.resolve("../segments") : inputURI.resolve("./segments"); + batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputURI.toString()); } } - IngestionJobLauncher.runIngestionJob(spec); } - } else { - LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion", - ingestionJobSpecFile.getAbsolutePath()); } - return true; + } + + private boolean waitForMinionTaskToFinish(long timeoutInMillis) { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < timeoutInMillis) { + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + // Swallow the exception + } + try { + final Map<String, String> taskStatesMap = + _minionClient.getTasksStates(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE); + if (taskStatesMap.isEmpty()) { + LOGGER.info("No scheduled tasks yet, sleep 500 millis seconds"); + continue; + } + boolean allCompleted = true; + for (String taskId : taskStatesMap.keySet()) { + if (!COMPLETED.equalsIgnoreCase(taskStatesMap.get(taskId))) { + allCompleted = false; + break; + } + } + if (allCompleted) { + LOGGER.info("All minion tasks are completed."); + return true; + } + } catch (Exception e) { + LOGGER.error("Failed to query task endpoint", e); + continue; + } + } + return false; } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java index 69f5941..475a8ec 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java @@ -43,6 +43,10 @@ public class Quickstart { } } + public String getBootstrapDataDir() { + return "examples/batch/baseballStats"; + } + public static void printStatus(Color color, String message) { System.out.println(color._code + message + Color.RESET._code); } @@ -144,16 +148,17 @@ public class Quickstart { File dataFile = new File(dataDir, "baseballStats_data.csv"); ClassLoader classLoader = Quickstart.class.getClassLoader(); - URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json"); + URL resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_schema.json"); com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, schemaFile); - resource = classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv"); + resource = classLoader.getResource(getBootstrapDataDir() + "/rawdata/baseballStats_data.csv"); com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, dataFile); - resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml"); - com.google.common.base.Preconditions.checkNotNull(resource); - FileUtils.copyURLToFile(resource, ingestionJobSpecFile); - resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json"); + resource = classLoader.getResource(getBootstrapDataDir() + "/ingestionJobSpec.yaml"); + if (resource != null) { + FileUtils.copyURLToFile(resource, ingestionJobSpecFile); + } + resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_offline_table_config.json"); com.google.common.base.Preconditions.checkNotNull(resource); FileUtils.copyURLToFile(resource, tableConfigFile); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index e2e3e38..072effd 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -53,6 +53,8 @@ public class QuickstartRunner { private static final int DEFAULT_SERVER_ADMIN_API_PORT = 7500; private static final int DEFAULT_BROKER_PORT = 8000; private static final int DEFAULT_CONTROLLER_PORT = 9000; + private static final int DEFAULT_MINION_PORT = 6000; + private static final String DEFAULT_ZK_DIR = "PinotZkDir"; private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir"; @@ -63,6 +65,7 @@ public class QuickstartRunner { private final int _numServers; private final int _numBrokers; private final int _numControllers; + private final int _numMinions; private final File _tempDir; private final boolean _enableTenantIsolation; @@ -74,10 +77,17 @@ public class QuickstartRunner { public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers, int numControllers, File tempDir, boolean enableIsolation) throws Exception { + this(tableRequests, numServers, numBrokers, numControllers, 1, tempDir, enableIsolation); + } + + public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers, + int numControllers, int numMinions, File tempDir, boolean enableIsolation) + throws Exception { _tableRequests = tableRequests; _numServers = numServers; _numBrokers = numBrokers; _numControllers = numControllers; + _numMinions = numMinions; _tempDir = tempDir; _enableTenantIsolation = enableIsolation; clean(); @@ -131,6 +141,16 @@ public class QuickstartRunner { } } + private void startMinions() + throws Exception { + for (int i = 0; i < _numMinions; i++) { + StartMinionCommand minionStarter = new StartMinionCommand(); + minionStarter.setMinionPort(DEFAULT_MINION_PORT + i) + .setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME); + minionStarter.execute(); + } + } + private void clean() throws Exception { FileUtils.cleanDirectory(_tempDir); @@ -142,6 +162,7 @@ public class QuickstartRunner { startControllers(); startBrokers(); startServers(); + startMinions(); } public void stop() diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java index 4ba95bc..74cb81f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java @@ -110,4 +110,24 @@ public class StartMinionCommand extends AbstractBaseAdminCommand implements Comm } return PinotConfigUtils.generateMinionConf(_minionHost, _minionPort); } + + public StartMinionCommand setMinionHost(String minionHost) { + _minionHost = minionHost; + return this; + } + + public StartMinionCommand setMinionPort(int minionPort) { + _minionPort = minionPort; + return this; + } + + public StartMinionCommand setZkAddress(String zkAddress) { + _zkAddress = zkAddress; + return this; + } + + public StartMinionCommand setClusterName(String clusterName) { + _clusterName = clusterName; + return this; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
