http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java b/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java deleted file mode 100644 index 56b920b..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.metrics; - -import org.apache.storm.Config; -import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter; -import org.apache.storm.daemon.metrics.reporters.PreparableReporter; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class MetricsUtils { - private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class); - - public static List<PreparableReporter> getPreparableReporters(Map stormConf) { - List<String> clazzes = (List<String>) stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS); - List<PreparableReporter> reporterList = new ArrayList<>(); - - if (clazzes != null) { - for (String clazz : clazzes) { - reporterList.add(getPreparableReporter(clazz)); - } - } - if (reporterList.isEmpty()) { - reporterList.add(new JmxPreparableReporter()); - } - return reporterList; - } - - private static PreparableReporter getPreparableReporter(String clazz) { - PreparableReporter reporter = null; - LOG.info("Using statistics reporter plugin:" + clazz); - if (clazz != null) { - reporter = (PreparableReporter) Utils.newInstance(clazz); - } - return reporter; - } - - public static Locale getMetricsReporterLocale(Map stormConf) { - String languageTag = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null); - if (languageTag != null) { - return Locale.forLanguageTag(languageTag); - } - return null; - } - - public static TimeUnit getMetricsRateUnit(Map stormConf) { - return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT); - } - - public static TimeUnit getMetricsDurationUnit(Map stormConf) { - return getTimeUnitForCofig(stormConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT); - } - - private static TimeUnit getTimeUnitForCofig(Map stormConf, String configName) { - String rateUnitString = Utils.getString(stormConf.get(configName), null); - if (rateUnitString != null) { - return TimeUnit.valueOf(rateUnitString); - } - return null; - } - - public static File getCsvLogDir(Map stormConf) { - String csvMetricsLogDirectory = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null); - if (csvMetricsLogDirectory == null) { - csvMetricsLogDirectory = absoluteStormLocalDir(stormConf); - csvMetricsLogDirectory = csvMetricsLogDirectory + File.separator + "csvmetrics"; - } - File csvMetricsDir = new File(csvMetricsLogDirectory); - validateCreateOutputDir(csvMetricsDir); - return csvMetricsDir; - } - - private static void validateCreateOutputDir(File dir) { - if (!dir.exists()) { - dir.mkdirs(); - } - if (!dir.canWrite()) { - throw new IllegalStateException(dir.getName() + " does not have write permissions."); - } - if (!dir.isDirectory()) { - throw new IllegalStateException(dir.getName() + " is not a directory."); - } - } - - public static String absoluteStormLocalDir(Map conf) { - String stormHome = System.getProperty("storm.home"); - String localDir = (String) conf.get(Config.STORM_LOCAL_DIR); - if (localDir == null) { - return (stormHome + File.separator + "storm-local"); - } else { - if (new File(localDir).isAbsolute()) { - return localDir; - } else { - return (stormHome + File.separator + localDir); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java deleted file mode 100644 index 1eacb63..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.metrics.reporters; - -import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> { - private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); - ConsoleReporter reporter = null; - - @Override - public void prepare(MetricRegistry metricsRegistry, Map stormConf) { - LOG.debug("Preparing..."); - ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry); - - builder.outputTo(System.out); - Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf); - if (locale != null) { - builder.formattedFor(locale); - } - - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf); - if (rateUnit != null) { - builder.convertRatesTo(rateUnit); - } - - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf); - if (durationUnit != null) { - builder.convertDurationsTo(durationUnit); - } - reporter = builder.build(); - } - - @Override - public void start() { - if (reporter != null) { - LOG.debug("Starting..."); - reporter.start(10, TimeUnit.SECONDS); - } else { - throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); - } - } - - @Override - public void stop() { - if (reporter != null) { - LOG.debug("Stopping..."); - reporter.stop(); - } else { - throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java deleted file mode 100644 index 605f389..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.metrics.reporters; - -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class CsvPreparableReporter implements PreparableReporter<CsvReporter> { - private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); - CsvReporter reporter = null; - - @Override - public void prepare(MetricRegistry metricsRegistry, Map stormConf) { - LOG.debug("Preparing..."); - CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); - - Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf); - if (locale != null) { - builder.formatFor(locale); - } - - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf); - if (rateUnit != null) { - builder.convertRatesTo(rateUnit); - } - - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf); - if (durationUnit != null) { - builder.convertDurationsTo(durationUnit); - } - - File csvMetricsDir = MetricsUtils.getCsvLogDir(stormConf); - reporter = builder.build(csvMetricsDir); - } - - @Override - public void start() { - if (reporter != null) { - LOG.debug("Starting..."); - reporter.start(10, TimeUnit.SECONDS); - } else { - throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); - } - } - - @Override - public void stop() { - if (reporter != null) { - LOG.debug("Stopping..."); - reporter.stop(); - } else { - throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); - } - } - -} - http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java deleted file mode 100644 index cf4aa1c..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.metrics.reporters; - -import com.codahale.metrics.JmxReporter; -import com.codahale.metrics.MetricRegistry; -import org.apache.storm.Config; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class JmxPreparableReporter implements PreparableReporter<JmxReporter> { - private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); - JmxReporter reporter = null; - - @Override - public void prepare(MetricRegistry metricsRegistry, Map stormConf) { - LOG.info("Preparing..."); - JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); - String domain = Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null); - if (domain != null) { - builder.inDomain(domain); - } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf); - if (rateUnit != null) { - builder.convertRatesTo(rateUnit); - } - reporter = builder.build(); - - } - - @Override - public void start() { - if (reporter != null) { - LOG.debug("Starting..."); - reporter.start(); - } else { - throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); - } - } - - @Override - public void stop() { - if (reporter != null) { - LOG.debug("Stopping..."); - reporter.stop(); - } else { - throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java deleted file mode 100644 index 2968bfb..0000000 --- a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.metrics.reporters; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Reporter; - -import java.io.Closeable; -import java.util.Map; - - -public interface PreparableReporter<T extends Reporter & Closeable> { - void prepare(MetricRegistry metricsRegistry, Map stormConf); - void start(); - void stop(); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java b/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java deleted file mode 100644 index d360ae0..0000000 --- a/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.dependency; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.json.simple.JSONValue; -import org.json.simple.parser.ParseException; - -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -public class DependencyPropertiesParser { - public List<File> parseJarsProperties(String prop) { - if (prop.trim().isEmpty()) { - // handle no input - return Collections.emptyList(); - } - - List<String> dependencies = Arrays.asList(prop.split(",")); - return Lists.transform(dependencies, new Function<String, File>() { - @Override - public File apply(String filePath) { - return new File(filePath); - } - }); - } - - public Map<String, File> parseArtifactsProperties(String prop) { - try { - Map<String, String> parsed = (Map<String, String>) JSONValue.parseWithException(prop); - Map<String, File> packages = new LinkedHashMap<>(parsed.size()); - for (Map.Entry<String, String> artifactToFilePath : parsed.entrySet()) { - packages.put(artifactToFilePath.getKey(), new File(artifactToFilePath.getValue())); - } - - return packages; - } catch (ParseException e) { - throw new RuntimeException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java b/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java deleted file mode 100644 index 4f71f67..0000000 --- a/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.dependency; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.blobstore.AtomicOutputStream; -import org.apache.storm.blobstore.BlobStoreUtils; -import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.generated.AccessControl; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyAlreadyExistsException; -import org.apache.storm.generated.KeyNotFoundException; -import org.apache.storm.generated.SettableBlobMeta; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public class DependencyUploader { - public static final Logger LOG = LoggerFactory.getLogger(DependencyUploader.class); - - private final Map<String, Object> conf; - private ClientBlobStore blobStore; - - public DependencyUploader() { - conf = Utils.readStormConfig(); - } - - public void init() { - if (blobStore == null) { - blobStore = Utils.getClientBlobStore(conf); - } - } - - public void shutdown() { - if (blobStore != null) { - blobStore.shutdown(); - } - } - - @VisibleForTesting - void setBlobStore(ClientBlobStore blobStore) { - this.blobStore = blobStore; - } - - @SuppressWarnings("unchecked") - public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException { - checkFilesExist(dependencies); - - List<String> keys = new ArrayList<>(dependencies.size()); - try { - for (File dependency : dependencies) { - String fileName = dependency.getName(); - String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName)); - - try { - uploadDependencyToBlobStore(key, dependency); - } catch (KeyAlreadyExistsException e) { - // it should never happened since we apply UUID - throw new RuntimeException(e); - } - - keys.add(key); - } - } catch (Throwable e) { - if (blobStore != null && cleanupIfFails) { - deleteBlobs(keys); - } - throw new RuntimeException(e); - } - - return keys; - } - - public List<String> uploadArtifacts(Map<String, File> artifacts) { - checkFilesExist(artifacts.values()); - - List<String> keys = new ArrayList<>(artifacts.size()); - try { - for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) { - String artifact = artifactToFile.getKey(); - File dependency = artifactToFile.getValue(); - - String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact)); - try { - uploadDependencyToBlobStore(key, dependency); - } catch (KeyAlreadyExistsException e) { - // we lose the race, but it doesn't matter - } - - keys.add(key); - } - } catch (Throwable e) { - throw new RuntimeException(e); - } - - return keys; - } - - public void deleteBlobs(List<String> keys) { - for (String key : keys) { - try { - blobStore.deleteBlob(key); - } catch (Throwable e) { - LOG.warn("blob delete failed - key: {} continue...", key); - } - } - } - - private String convertArtifactToJarFileName(String artifact) { - return artifact.replace(":", "-") + ".jar"; - } - - private boolean uploadDependencyToBlobStore(String key, File dependency) - throws KeyAlreadyExistsException, AuthorizationException, IOException { - - boolean uploadNew = false; - try { - // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved - // as a workaround, we call getBlobMeta() for all keys - blobStore.getBlobMeta(key); - } catch (KeyNotFoundException e) { - // TODO: do we want to add ACL here? - AtomicOutputStream blob = blobStore - .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>())); - Files.copy(dependency.toPath(), blob); - blob.close(); - - uploadNew = true; - } - - return uploadNew; - } - - private void checkFilesExist(Collection<File> dependencies) { - for (File dependency : dependencies) { - if (!dependency.isFile() || !dependency.exists()) { - throw new FileNotAvailableException(dependency.getAbsolutePath()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java b/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java deleted file mode 100644 index 874485b..0000000 --- a/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.dependency; - -public class FileNotAvailableException extends RuntimeException { - public FileNotAvailableException(String fileName) { - super(createMessage(fileName)); - } - - public FileNotAvailableException(String fileName, Throwable cause) { - super(createMessage(fileName), cause); - } - - private static String createMessage(String fileName) { - return "This file is not available: " + fileName; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java deleted file mode 100644 index 9d01e81..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.storm.generated.DRPCRequest; -import org.apache.storm.generated.DistributedRPCInvocations; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.security.auth.ThriftClient; -import org.apache.storm.security.auth.ThriftConnectionType; -import org.apache.thrift.transport.TTransportException; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface { - public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class); - private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>(); - private String host; - private int port; - - public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException { - super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null); - this.host = host; - this.port = port; - client.set(new DistributedRPCInvocations.Client(_protocol)); - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public void reconnectClient() throws TException { - if (client.get() == null) { - reconnect(); - client.set(new DistributedRPCInvocations.Client(_protocol)); - } - } - - public boolean isConnected() { - return client.get() != null; - } - - public void result(String id, String result) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - c.result(id, result); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - return c.fetchRequest(func); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public void failRequest(String id) throws TException, AuthorizationException { - DistributedRPCInvocations.Client c = client.get(); - try { - if (c == null) { - throw new TException("Client is not connected..."); - } - c.failRequest(id); - } catch(AuthorizationException aze) { - throw aze; - } catch(TException e) { - client.compareAndSet(c, null); - throw e; - } - } - - public DistributedRPCInvocations.Client getClient() { - return client.get(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java deleted file mode 100644 index 791fc91..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.Config; -import org.apache.storm.ILocalDRPC; -import org.apache.storm.generated.DRPCRequest; -import org.apache.storm.generated.DistributedRPCInvocations; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.ExtendedThreadPoolExecutor; -import org.apache.storm.utils.ServiceRegistry; -import org.apache.storm.utils.Utils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.Callable; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.thrift.TException; -import org.json.simple.JSONValue; - -public class DRPCSpout extends BaseRichSpout { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS - static final long serialVersionUID = 2387848310969237877L; - - public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); - - SpoutOutputCollector _collector; - List<DRPCInvocationsClient> _clients = new ArrayList<>(); - transient LinkedList<Future<Void>> _futures = null; - transient ExecutorService _backround = null; - String _function; - String _local_drpc_id = null; - - private static class DRPCMessageId { - String id; - int index; - - public DRPCMessageId(String id, int index) { - this.id = id; - this.index = index; - } - } - - - public DRPCSpout(String function) { - _function = function; - } - - public DRPCSpout(String function, ILocalDRPC drpc) { - _function = function; - _local_drpc_id = drpc.getServiceId(); - } - - public String get_function() { - return _function; - } - - private class Adder implements Callable<Void> { - private String server; - private int port; - private Map conf; - - public Adder(String server, int port, Map conf) { - this.server = server; - this.port = port; - this.conf = conf; - } - - @Override - public Void call() throws Exception { - DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port); - synchronized (_clients) { - _clients.add(c); - } - return null; - } - } - - private void reconnect(final DRPCInvocationsClient c) { - _futures.add(_backround.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - c.reconnectClient(); - return null; - } - })); - } - - private void checkFutures() { - Iterator<Future<Void>> i = _futures.iterator(); - while (i.hasNext()) { - Future<Void> f = i.next(); - if (f.isDone()) { - i.remove(); - } - try { - f.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - if(_local_drpc_id==null) { - _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); - _futures = new LinkedList<>(); - - int numTasks = context.getComponentTasks(context.getThisComponentId()).size(); - int index = context.getThisTaskIndex(); - - int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); - List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS); - if(servers == null || servers.isEmpty()) { - throw new RuntimeException("No DRPC servers configured for topology"); - } - - if (numTasks < servers.size()) { - for (String s: servers) { - _futures.add(_backround.submit(new Adder(s, port, conf))); - } - } else { - int i = index % servers.size(); - _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); - } - } - - } - - @Override - public void close() { - for(DRPCInvocationsClient client: _clients) { - client.close(); - } - } - - @Override - public void nextTuple() { - boolean gotRequest = false; - if(_local_drpc_id==null) { - int size; - synchronized (_clients) { - size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end - } - for(int i=0; i<size; i++) { - DRPCInvocationsClient client; - synchronized (_clients) { - client = _clients.get(i); - } - if (!client.isConnected()) { - LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort()); - reconnect(client); - continue; - } - try { - DRPCRequest req = client.fetchRequest(_function); - if(req.get_request_id().length() > 0) { - Map returnInfo = new HashMap(); - returnInfo.put("id", req.get_request_id()); - returnInfo.put("host", client.getHost()); - returnInfo.put("port", client.getPort()); - gotRequest = true; - _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i)); - break; - } - } catch (AuthorizationException aze) { - reconnect(client); - LOG.error("Not authorized to fetch DRPC result from DRPC server", aze); - } catch (TException e) { - reconnect(client); - LOG.error("Failed to fetch DRPC result from DRPC server", e); - } catch (Exception e) { - LOG.error("Failed to fetch DRPC result from DRPC server", e); - } - } - checkFutures(); - } else { - DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); - if(drpc!=null) { // can happen during shutdown of drpc while topology is still up - try { - DRPCRequest req = drpc.fetchRequest(_function); - if(req.get_request_id().length() > 0) { - Map returnInfo = new HashMap(); - returnInfo.put("id", req.get_request_id()); - returnInfo.put("host", _local_drpc_id); - returnInfo.put("port", 0); - gotRequest = true; - _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0)); - } - } catch (AuthorizationException aze) { - throw new RuntimeException(aze); - } catch (TException e) { - throw new RuntimeException(e); - } - } - } - if(!gotRequest) { - Utils.sleep(1); - } - } - - @Override - public void ack(Object msgId) { - } - - @Override - public void fail(Object msgId) { - DRPCMessageId did = (DRPCMessageId) msgId; - DistributedRPCInvocations.Iface client; - - if(_local_drpc_id == null) { - client = _clients.get(did.index); - } else { - client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id); - } - try { - client.failRequest(did.id); - } catch (AuthorizationException aze) { - LOG.error("Not authorized to failREquest from DRPC server", aze); - } catch (TException e) { - LOG.error("Failed to fail request", e); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("args", "return-info")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java b/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java deleted file mode 100644 index f57bbb1..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class JoinResult extends BaseRichBolt { - public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class); - - String returnComponent; - Map<Object, Tuple> returns = new HashMap<>(); - Map<Object, Tuple> results = new HashMap<>(); - OutputCollector _collector; - - public JoinResult(String returnComponent) { - this.returnComponent = returnComponent; - } - - public void prepare(Map map, TopologyContext context, OutputCollector collector) { - _collector = collector; - } - - public void execute(Tuple tuple) { - Object requestId = tuple.getValue(0); - if(tuple.getSourceComponent().equals(returnComponent)) { - returns.put(requestId, tuple); - } else { - results.put(requestId, tuple); - } - - if(returns.containsKey(requestId) && results.containsKey(requestId)) { - Tuple result = results.remove(requestId); - Tuple returner = returns.remove(requestId); - LOG.debug(result.getValue(1).toString()); - List<Tuple> anchors = new ArrayList<>(); - anchors.add(result); - anchors.add(returner); - _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1))); - _collector.ack(result); - _collector.ack(returner); - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("result", "return-info")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java b/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java deleted file mode 100644 index 39860e6..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicBoltExecutor; -import org.apache.storm.topology.IBasicBolt; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.KeyedRoundRobinQueue; -import java.util.HashMap; -import java.util.Map; - - -public class KeyedFairBolt implements IRichBolt, FinishedCallback { - IRichBolt _delegate; - KeyedRoundRobinQueue<Tuple> _rrQueue; - Thread _executor; - FinishedCallback _callback; - - public KeyedFairBolt(IRichBolt delegate) { - _delegate = delegate; - } - - public KeyedFairBolt(IBasicBolt delegate) { - this(new BasicBoltExecutor(delegate)); - } - - - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - if(_delegate instanceof FinishedCallback) { - _callback = (FinishedCallback) _delegate; - } - _delegate.prepare(stormConf, context, collector); - _rrQueue = new KeyedRoundRobinQueue<Tuple>(); - _executor = new Thread(new Runnable() { - public void run() { - try { - while(true) { - _delegate.execute(_rrQueue.take()); - } - } catch (InterruptedException e) { - - } - } - }); - _executor.setDaemon(true); - _executor.start(); - } - - public void execute(Tuple input) { - Object key = input.getValue(0); - _rrQueue.add(key, input); - } - - public void cleanup() { - _executor.interrupt(); - _delegate.cleanup(); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - _delegate.declareOutputFields(declarer); - } - - public void finishedId(Object id) { - if(_callback!=null) { - _callback.finishedId(id); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return new HashMap<String, Object>(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java deleted file mode 100644 index 6f82f80..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.grouping.CustomStreamGrouping; -import org.apache.storm.topology.ComponentConfigurationDeclarer; -import org.apache.storm.tuple.Fields; - -public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> { - public LinearDRPCInputDeclarer fieldsGrouping(Fields fields); - public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields); - - public LinearDRPCInputDeclarer globalGrouping(); - public LinearDRPCInputDeclarer globalGrouping(String streamId); - - public LinearDRPCInputDeclarer shuffleGrouping(); - public LinearDRPCInputDeclarer shuffleGrouping(String streamId); - - public LinearDRPCInputDeclarer localOrShuffleGrouping(); - public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId); - - public LinearDRPCInputDeclarer noneGrouping(); - public LinearDRPCInputDeclarer noneGrouping(String streamId); - - public LinearDRPCInputDeclarer allGrouping(); - public LinearDRPCInputDeclarer allGrouping(String streamId); - - public LinearDRPCInputDeclarer directGrouping(); - public LinearDRPCInputDeclarer directGrouping(String streamId); - - public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields); - public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields); - - public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping); - public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java deleted file mode 100644 index dc702a3..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java +++ /dev/null @@ -1,393 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.Constants; -import org.apache.storm.ILocalDRPC; -import org.apache.storm.coordination.BatchBoltExecutor; -import org.apache.storm.coordination.CoordinatedBolt; -import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback; -import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec; -import org.apache.storm.coordination.CoordinatedBolt.SourceArgs; -import org.apache.storm.coordination.IBatchBolt; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.generated.StreamInfo; -import org.apache.storm.grouping.CustomStreamGrouping; -import org.apache.storm.grouping.PartialKeyGrouping; -import org.apache.storm.topology.BaseConfigurationDeclarer; -import org.apache.storm.topology.BasicBoltExecutor; -import org.apache.storm.topology.BoltDeclarer; -import org.apache.storm.topology.IBasicBolt; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.InputDeclarer; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -// Trident subsumes the functionality provided by this class, so it's deprecated -@Deprecated -public class LinearDRPCTopologyBuilder { - String _function; - List<Component> _components = new ArrayList<Component>(); - - - public LinearDRPCTopologyBuilder(String function) { - _function = function; - } - - public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) { - return addBolt(new BatchBoltExecutor(bolt), parallelism); - } - - public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) { - return addBolt(bolt, 1); - } - - @Deprecated - public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) { - if(parallelism==null) parallelism = 1; - Component component = new Component(bolt, parallelism.intValue()); - _components.add(component); - return new InputDeclarerImpl(component); - } - - @Deprecated - public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) { - return addBolt(bolt, null); - } - - public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) { - return addBolt(new BasicBoltExecutor(bolt), parallelism); - } - - public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) { - return addBolt(bolt, null); - } - - public StormTopology createLocalTopology(ILocalDRPC drpc) { - return createTopology(new DRPCSpout(_function, drpc)); - } - - public StormTopology createRemoteTopology() { - return createTopology(new DRPCSpout(_function)); - } - - - private StormTopology createTopology(DRPCSpout spout) { - final String SPOUT_ID = "spout"; - final String PREPARE_ID = "prepare-request"; - - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout(SPOUT_ID, spout); - builder.setBolt(PREPARE_ID, new PrepareRequest()) - .noneGrouping(SPOUT_ID); - int i=0; - for(; i<_components.size();i++) { - Component component = _components.get(i); - - Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); - if (i==1) { - source.put(boltId(i-1), SourceArgs.single()); - } else if (i>=2) { - source.put(boltId(i-1), SourceArgs.all()); - } - IdStreamSpec idSpec = null; - if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) { - idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); - } - BoltDeclarer declarer = builder.setBolt( - boltId(i), - new CoordinatedBolt(component.bolt, source, idSpec), - component.parallelism); - - for(Map<String, Object> conf: component.componentConfs) { - declarer.addConfigurations(conf); - } - - if(idSpec!=null) { - declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); - } - if(i==0 && component.declarations.isEmpty()) { - declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); - } else { - String prevId; - if(i==0) { - prevId = PREPARE_ID; - } else { - prevId = boltId(i-1); - } - for(InputDeclaration declaration: component.declarations) { - declaration.declare(prevId, declarer); - } - } - if(i>0) { - declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); - } - } - - IRichBolt lastBolt = _components.get(_components.size()-1).bolt; - OutputFieldsGetter getter = new OutputFieldsGetter(); - lastBolt.declareOutputFields(getter); - Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); - if(streams.size()!=1) { - throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); - } - String outputStream = streams.keySet().iterator().next(); - List<String> fields = streams.get(outputStream).get_output_fields(); - if(fields.size()!=2) { - throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); - } - - builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) - .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))) - .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); - i++; - builder.setBolt(boltId(i), new ReturnResults()) - .noneGrouping(boltId(i-1)); - return builder.createTopology(); - } - - private static String boltId(int index) { - return "bolt" + index; - } - - private static class Component { - public IRichBolt bolt; - public int parallelism; - public List<Map<String, Object>> componentConfs = new ArrayList<>(); - public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>(); - - public Component(IRichBolt bolt, int parallelism) { - this.bolt = bolt; - this.parallelism = parallelism; - } - } - - private static interface InputDeclaration { - public void declare(String prevComponent, InputDeclarer declarer); - } - - private static class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer { - Component _component; - - public InputDeclarerImpl(Component component) { - _component = component; - } - - @Override - public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.fieldsGrouping(prevComponent, fields); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.fieldsGrouping(prevComponent, streamId, fields); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer globalGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.globalGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer globalGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.globalGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer shuffleGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.shuffleGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.shuffleGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer localOrShuffleGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.localOrShuffleGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer localOrShuffleGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.localOrShuffleGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer noneGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.noneGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer noneGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.noneGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer allGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.allGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer allGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.allGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer directGrouping() { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.directGrouping(prevComponent); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer directGrouping(final String streamId) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.directGrouping(prevComponent, streamId); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) { - return customGrouping(new PartialKeyGrouping(fields)); - } - - @Override - public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) { - return customGrouping(streamId, new PartialKeyGrouping(fields)); - } - - @Override - public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.customGrouping(prevComponent, grouping); - } - }); - return this; - } - - @Override - public LinearDRPCInputDeclarer customGrouping(final String streamId, final CustomStreamGrouping grouping) { - addDeclaration(new InputDeclaration() { - @Override - public void declare(String prevComponent, InputDeclarer declarer) { - declarer.customGrouping(prevComponent, streamId, grouping); - } - }); - return this; - } - - private void addDeclaration(InputDeclaration declaration) { - _component.declarations.add(declaration); - } - - @Override - public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) { - _component.componentConfs.add(conf); - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java b/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java deleted file mode 100644 index 15a3320..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import java.util.Map; -import java.util.Random; -import org.apache.storm.utils.Utils; - - -public class PrepareRequest extends BaseBasicBolt { - public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID; - public static final String RETURN_STREAM = "ret"; - public static final String ID_STREAM = "id"; - - Random rand; - - @Override - public void prepare(Map map, TopologyContext context) { - rand = new Random(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String args = tuple.getString(0); - String returnInfo = tuple.getString(1); - long requestId = rand.nextLong(); - collector.emit(ARGS_STREAM, new Values(requestId, args)); - collector.emit(RETURN_STREAM, new Values(requestId, returnInfo)); - collector.emit(ID_STREAM, new Values(requestId)); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(ARGS_STREAM, new Fields("request", "args")); - declarer.declareStream(RETURN_STREAM, new Fields("request", "return")); - declarer.declareStream(ID_STREAM, new Fields("request")); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java deleted file mode 100644 index a9a5aa1..0000000 --- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.drpc; - -import org.apache.storm.Config; -import org.apache.storm.generated.DistributedRPCInvocations; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.ServiceRegistry; -import org.apache.storm.utils.Utils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; -import org.json.simple.JSONValue; - - -public class ReturnResults extends BaseRichBolt { - //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS - static final long serialVersionUID = -774882142710631591L; - - public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class); - OutputCollector _collector; - boolean local; - Map _conf; - Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>(); - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - _conf = stormConf; - _collector = collector; - local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local"); - } - - @Override - public void execute(Tuple input) { - String result = (String) input.getValue(0); - String returnInfo = (String) input.getValue(1); - if(returnInfo!=null) { - Map retMap = (Map) JSONValue.parse(returnInfo); - final String host = (String) retMap.get("host"); - final int port = Utils.getInt(retMap.get("port")); - String id = (String) retMap.get("id"); - DistributedRPCInvocations.Iface client; - if(local) { - client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host); - } else { - List server = new ArrayList() {{ - add(host); - add(port); - }}; - - if(!_clients.containsKey(server)) { - try { - _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); - } catch (TTransportException ex) { - throw new RuntimeException(ex); - } - } - client = _clients.get(server); - } - - - int retryCnt = 0; - int maxRetries = 3; - while (retryCnt < maxRetries) { - retryCnt++; - try { - client.result(id, result); - _collector.ack(input); - break; - } catch (AuthorizationException aze) { - LOG.error("Not authorized to return results to DRPC server", aze); - _collector.fail(input); - throw new RuntimeException(aze); - } catch (TException tex) { - if (retryCnt >= maxRetries) { - LOG.error("Failed to return results to DRPC server", tex); - _collector.fail(input); - } - reconnectClient((DRPCInvocationsClient) client); - } - } - } - } - - private void reconnectClient(DRPCInvocationsClient client) { - if (client instanceof DRPCInvocationsClient) { - try { - LOG.info("reconnecting... "); - client.reconnectClient(); //Blocking call - } catch (TException e2) { - LOG.error("Failed to connect to DRPC server", e2); - } - } - } - @Override - public void cleanup() { - for(DRPCInvocationsClient c: _clients.values()) { - c.close(); - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } -}