http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java 
b/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java
deleted file mode 100644
index 56b920b..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/metrics/MetricsUtils.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class MetricsUtils {
-    private final static Logger LOG = 
LoggerFactory.getLogger(MetricsUtils.class);
-
-    public static List<PreparableReporter> getPreparableReporters(Map 
stormConf) {
-        List<String> clazzes = (List<String>) 
stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGINS);
-        List<PreparableReporter> reporterList = new ArrayList<>();
-
-        if (clazzes != null) {
-            for (String clazz : clazzes) {
-                reporterList.add(getPreparableReporter(clazz));
-            }
-        }
-        if (reporterList.isEmpty()) {
-            reporterList.add(new JmxPreparableReporter());
-        }
-        return reporterList;
-    }
-
-    private static PreparableReporter getPreparableReporter(String clazz) {
-        PreparableReporter reporter = null;
-        LOG.info("Using statistics reporter plugin:" + clazz);
-        if (clazz != null) {
-            reporter = (PreparableReporter) Utils.newInstance(clazz);
-        }
-        return reporter;
-    }
-
-    public static Locale getMetricsReporterLocale(Map stormConf) {
-        String languageTag = 
Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE),
 null);
-        if (languageTag != null) {
-            return Locale.forLanguageTag(languageTag);
-        }
-        return null;
-    }
-
-    public static TimeUnit getMetricsRateUnit(Map stormConf) {
-        return getTimeUnitForCofig(stormConf, 
Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT);
-    }
-
-    public static TimeUnit getMetricsDurationUnit(Map stormConf) {
-        return getTimeUnitForCofig(stormConf, 
Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT);
-    }
-
-    private static TimeUnit getTimeUnitForCofig(Map stormConf, String 
configName) {
-        String rateUnitString = Utils.getString(stormConf.get(configName), 
null);
-        if (rateUnitString != null) {
-            return TimeUnit.valueOf(rateUnitString);
-        }
-        return null;
-    }
-
-    public static File getCsvLogDir(Map stormConf) {
-        String csvMetricsLogDirectory = 
Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR),
 null);
-        if (csvMetricsLogDirectory == null) {
-            csvMetricsLogDirectory = absoluteStormLocalDir(stormConf);
-            csvMetricsLogDirectory = csvMetricsLogDirectory + File.separator + 
"csvmetrics";
-        }
-        File csvMetricsDir = new File(csvMetricsLogDirectory);
-        validateCreateOutputDir(csvMetricsDir);
-        return csvMetricsDir;
-    }
-
-    private static void validateCreateOutputDir(File dir) {
-        if (!dir.exists()) {
-            dir.mkdirs();
-        }
-        if (!dir.canWrite()) {
-            throw new IllegalStateException(dir.getName() + " does not have 
write permissions.");
-        }
-        if (!dir.isDirectory()) {
-            throw new IllegalStateException(dir.getName() + " is not a 
directory.");
-        }
-    }
-
-    public static String absoluteStormLocalDir(Map conf) {
-        String stormHome = System.getProperty("storm.home");
-        String localDir = (String) conf.get(Config.STORM_LOCAL_DIR);
-        if (localDir == null) {
-            return (stormHome + File.separator + "storm-local");
-        } else {
-            if (new File(localDir).isAbsolute()) {
-                return localDir;
-            } else {
-                return (stormHome + File.separator + localDir);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 
b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
deleted file mode 100644
index 1eacb63..0000000
--- 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics.reporters;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class ConsolePreparableReporter implements 
PreparableReporter<ConsoleReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(ConsolePreparableReporter.class);
-    ConsoleReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.debug("Preparing...");
-        ConsoleReporter.Builder builder = 
ConsoleReporter.forRegistry(metricsRegistry);
-
-        builder.outputTo(System.out);
-        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
-        if (locale != null) {
-            builder.formattedFor(locale);
-        }
-
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
-        if (rateUnit != null) {
-            builder.convertRatesTo(rateUnit);
-        }
-
-        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(durationUnit);
-        }
-        reporter = builder.build();
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.debug("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.debug("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java
 
b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java
deleted file mode 100644
index 605f389..0000000
--- 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/CsvPreparableReporter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics.reporters;
-
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class CsvPreparableReporter implements PreparableReporter<CsvReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(CsvPreparableReporter.class);
-    CsvReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.debug("Preparing...");
-        CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry);
-
-        Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf);
-        if (locale != null) {
-            builder.formatFor(locale);
-        }
-
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
-        if (rateUnit != null) {
-            builder.convertRatesTo(rateUnit);
-        }
-
-        TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf);
-        if (durationUnit != null) {
-            builder.convertDurationsTo(durationUnit);
-        }
-
-        File csvMetricsDir = MetricsUtils.getCsvLogDir(stormConf);
-        reporter = builder.build(csvMetricsDir);
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.debug("Starting...");
-            reporter.start(10, TimeUnit.SECONDS);
-        } else {
-            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.debug("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
-        }
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java
 
b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java
deleted file mode 100644
index cf4aa1c..0000000
--- 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/JmxPreparableReporter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics.reporters;
-
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class JmxPreparableReporter implements PreparableReporter<JmxReporter> {
-    private final static Logger LOG = 
LoggerFactory.getLogger(JmxPreparableReporter.class);
-    JmxReporter reporter = null;
-
-    @Override
-    public void prepare(MetricRegistry metricsRegistry, Map stormConf) {
-        LOG.info("Preparing...");
-        JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry);
-        String domain = 
Utils.getString(stormConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN),
 null);
-        if (domain != null) {
-            builder.inDomain(domain);
-        }
-        TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf);
-        if (rateUnit != null) {
-            builder.convertRatesTo(rateUnit);
-        }
-        reporter = builder.build();
-
-    }
-
-    @Override
-    public void start() {
-        if (reporter != null) {
-            LOG.debug("Starting...");
-            reporter.start();
-        } else {
-            throw new IllegalStateException("Attempt to start without 
preparing " + getClass().getSimpleName());
-        }
-    }
-
-    @Override
-    public void stop() {
-        if (reporter != null) {
-            LOG.debug("Stopping...");
-            reporter.stop();
-        } else {
-            throw new IllegalStateException("Attempt to stop without preparing 
" + getClass().getSimpleName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java
 
b/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java
deleted file mode 100644
index 2968bfb..0000000
--- 
a/storm-core/src/jvm/backtype/storm/daemon/metrics/reporters/PreparableReporter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.metrics.reporters;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
-
-import java.io.Closeable;
-import java.util.Map;
-
-
-public interface PreparableReporter<T extends Reporter & Closeable> {
-  void prepare(MetricRegistry metricsRegistry, Map stormConf);
-  void start();
-  void stop();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java 
b/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java
deleted file mode 100644
index d360ae0..0000000
--- 
a/storm-core/src/jvm/backtype/storm/dependency/DependencyPropertiesParser.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.dependency;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.json.simple.JSONValue;
-import org.json.simple.parser.ParseException;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DependencyPropertiesParser {
-    public List<File> parseJarsProperties(String prop) {
-        if (prop.trim().isEmpty()) {
-            // handle no input
-            return Collections.emptyList();
-        }
-
-        List<String> dependencies = Arrays.asList(prop.split(","));
-        return Lists.transform(dependencies, new Function<String, File>() {
-            @Override
-            public File apply(String filePath) {
-                return new File(filePath);
-            }
-        });
-    }
-
-    public Map<String, File> parseArtifactsProperties(String prop) {
-        try {
-            Map<String, String> parsed = (Map<String, String>) 
JSONValue.parseWithException(prop);
-            Map<String, File> packages = new LinkedHashMap<>(parsed.size());
-            for (Map.Entry<String, String> artifactToFilePath : 
parsed.entrySet()) {
-                packages.put(artifactToFilePath.getKey(), new 
File(artifactToFilePath.getValue()));
-            }
-
-            return packages;
-        } catch (ParseException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java 
b/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java
deleted file mode 100644
index 4f71f67..0000000
--- a/storm-core/src/jvm/backtype/storm/dependency/DependencyUploader.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.dependency;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.blobstore.AtomicOutputStream;
-import org.apache.storm.blobstore.BlobStoreUtils;
-import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyAlreadyExistsException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class DependencyUploader {
-    public static final Logger LOG = 
LoggerFactory.getLogger(DependencyUploader.class);
-
-    private final Map<String, Object> conf;
-    private ClientBlobStore blobStore;
-
-    public DependencyUploader() {
-        conf = Utils.readStormConfig();
-    }
-
-    public void init() {
-        if (blobStore == null) {
-            blobStore = Utils.getClientBlobStore(conf);
-        }
-    }
-
-    public void shutdown() {
-        if (blobStore != null) {
-            blobStore.shutdown();
-        }
-    }
-
-    @VisibleForTesting
-    void setBlobStore(ClientBlobStore blobStore) {
-        this.blobStore = blobStore;
-    }
-
-    @SuppressWarnings("unchecked")
-    public List<String> uploadFiles(List<File> dependencies, boolean 
cleanupIfFails) throws IOException, AuthorizationException {
-        checkFilesExist(dependencies);
-
-        List<String> keys = new ArrayList<>(dependencies.size());
-        try {
-            for (File dependency : dependencies) {
-                String fileName = dependency.getName();
-                String key = 
BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));
-
-                try {
-                    uploadDependencyToBlobStore(key, dependency);
-                } catch (KeyAlreadyExistsException e) {
-                    // it should never happened since we apply UUID
-                    throw new RuntimeException(e);
-                }
-
-                keys.add(key);
-            }
-        } catch (Throwable e) {
-            if (blobStore != null && cleanupIfFails) {
-                deleteBlobs(keys);
-            }
-            throw new RuntimeException(e);
-        }
-
-        return keys;
-    }
-
-    public List<String> uploadArtifacts(Map<String, File> artifacts) {
-        checkFilesExist(artifacts.values());
-
-        List<String> keys = new ArrayList<>(artifacts.size());
-        try {
-            for (Map.Entry<String, File> artifactToFile : 
artifacts.entrySet()) {
-                String artifact = artifactToFile.getKey();
-                File dependency = artifactToFile.getValue();
-
-                String key = 
BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
-                try {
-                    uploadDependencyToBlobStore(key, dependency);
-                } catch (KeyAlreadyExistsException e) {
-                    // we lose the race, but it doesn't matter
-                }
-
-                keys.add(key);
-            }
-        } catch (Throwable e) {
-            throw new RuntimeException(e);
-        }
-
-        return keys;
-    }
-
-    public void deleteBlobs(List<String> keys) {
-        for (String key : keys) {
-            try {
-                blobStore.deleteBlob(key);
-            } catch (Throwable e) {
-                LOG.warn("blob delete failed - key: {} continue...", key);
-            }
-        }
-    }
-
-    private String convertArtifactToJarFileName(String artifact) {
-        return artifact.replace(":", "-") + ".jar";
-    }
-
-    private boolean uploadDependencyToBlobStore(String key, File dependency)
-            throws KeyAlreadyExistsException, AuthorizationException, 
IOException {
-
-        boolean uploadNew = false;
-        try {
-            // FIXME: we can filter by listKeys() with local blobstore when 
STORM-1986 is going to be resolved
-            // as a workaround, we call getBlobMeta() for all keys
-            blobStore.getBlobMeta(key);
-        } catch (KeyNotFoundException e) {
-            // TODO: do we want to add ACL here?
-            AtomicOutputStream blob = blobStore
-                    .createBlob(key, new SettableBlobMeta(new 
ArrayList<AccessControl>()));
-            Files.copy(dependency.toPath(), blob);
-            blob.close();
-
-            uploadNew = true;
-        }
-
-        return uploadNew;
-    }
-
-    private void checkFilesExist(Collection<File> dependencies) {
-        for (File dependency : dependencies) {
-            if (!dependency.isFile() || !dependency.exists()) {
-                throw new 
FileNotAvailableException(dependency.getAbsolutePath());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java 
b/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java
deleted file mode 100644
index 874485b..0000000
--- 
a/storm-core/src/jvm/backtype/storm/dependency/FileNotAvailableException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.dependency;
-
-public class FileNotAvailableException extends RuntimeException {
-    public FileNotAvailableException(String fileName) {
-        super(createMessage(fileName));
-    }
-
-    public FileNotAvailableException(String fileName, Throwable cause) {
-        super(createMessage(fileName), cause);
-    }
-
-    private static String createMessage(String fileName) {
-        return "This file is not available: " + fileName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java 
b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
deleted file mode 100644
index 9d01e81..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.generated.DistributedRPCInvocations;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.security.auth.ThriftClient;
-import org.apache.storm.security.auth.ThriftConnectionType;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DRPCInvocationsClient extends ThriftClient implements 
DistributedRPCInvocations.Iface {
-    public static final Logger LOG = 
LoggerFactory.getLogger(DRPCInvocationsClient.class);
-    private final AtomicReference<DistributedRPCInvocations.Client> client = 
new AtomicReference<>();
-    private String host;
-    private int port;
-
-    public DRPCInvocationsClient(Map conf, String host, int port) throws 
TTransportException {
-        super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
-        this.host = host;
-        this.port = port;
-        client.set(new DistributedRPCInvocations.Client(_protocol));
-    }
-        
-    public String getHost() {
-        return host;
-    }
-    
-    public int getPort() {
-        return port;
-    }       
-
-    public void reconnectClient() throws TException {
-        if (client.get() == null) {
-            reconnect();
-            client.set(new DistributedRPCInvocations.Client(_protocol));
-        }
-    }
-
-    public boolean isConnected() {
-        return client.get() != null;
-    }
-
-    public void result(String id, String result) throws TException, 
AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            c.result(id, result);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }
-
-    public DRPCRequest fetchRequest(String func) throws TException, 
AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            return c.fetchRequest(func);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }    
-
-    public void failRequest(String id) throws TException, 
AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            c.failRequest(id);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }
-
-    public DistributedRPCInvocations.Client getClient() {
-        return client.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java 
b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
deleted file mode 100644
index 791fc91..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.Config;
-import org.apache.storm.ILocalDRPC;
-import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.generated.DistributedRPCInvocations;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.ExtendedThreadPoolExecutor;
-import org.apache.storm.utils.ServiceRegistry;
-import org.apache.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Callable;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-
-public class DRPCSpout extends BaseRichSpout {
-    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE 
PROBLEMS
-    static final long serialVersionUID = 2387848310969237877L;
-
-    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-    
-    SpoutOutputCollector _collector;
-    List<DRPCInvocationsClient> _clients = new ArrayList<>();
-    transient LinkedList<Future<Void>> _futures = null;
-    transient ExecutorService _backround = null;
-    String _function;
-    String _local_drpc_id = null;
-    
-    private static class DRPCMessageId {
-        String id;
-        int index;
-        
-        public DRPCMessageId(String id, int index) {
-            this.id = id;
-            this.index = index;
-        }
-    }
-    
-    
-    public DRPCSpout(String function) {
-        _function = function;
-    }
-
-    public DRPCSpout(String function, ILocalDRPC drpc) {
-        _function = function;
-        _local_drpc_id = drpc.getServiceId();
-    }
-
-    public String get_function() {
-        return _function;
-    }
-
-    private class Adder implements Callable<Void> {
-        private String server;
-        private int port;
-        private Map conf;
-
-        public Adder(String server, int port, Map conf) {
-            this.server = server;
-            this.port = port;
-            this.conf = conf;
-        }
-
-        @Override
-        public Void call() throws Exception {
-            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, 
port);
-            synchronized (_clients) {
-                _clients.add(c);
-            }
-            return null;
-        }
-    }
-
-    private void reconnect(final DRPCInvocationsClient c) {
-        _futures.add(_backround.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                c.reconnectClient();
-                return null;
-            }
-        }));
-    }
-
-    private void checkFutures() {
-        Iterator<Future<Void>> i = _futures.iterator();
-        while (i.hasNext()) {
-            Future<Void> f = i.next();
-            if (f.isDone()) {
-                i.remove();
-            }
-            try {
-                f.get();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
- 
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
-        _collector = collector;
-        if(_local_drpc_id==null) {
-            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>());
-            _futures = new LinkedList<>();
-
-            int numTasks = 
context.getComponentTasks(context.getThisComponentId()).size();
-            int index = context.getThisTaskIndex();
-
-            int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
-            List<String> servers = (List<String>) 
conf.get(Config.DRPC_SERVERS);
-            if(servers == null || servers.isEmpty()) {
-                throw new RuntimeException("No DRPC servers configured for 
topology");   
-            }
-            
-            if (numTasks < servers.size()) {
-                for (String s: servers) {
-                    _futures.add(_backround.submit(new Adder(s, port, conf)));
-                }
-            } else {        
-                int i = index % servers.size();
-                _futures.add(_backround.submit(new Adder(servers.get(i), port, 
conf)));
-            }
-        }
-        
-    }
-
-    @Override
-    public void close() {
-        for(DRPCInvocationsClient client: _clients) {
-            client.close();
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        boolean gotRequest = false;
-        if(_local_drpc_id==null) {
-            int size;
-            synchronized (_clients) {
-                size = _clients.size(); //This will only ever grow, so no need 
to worry about falling off the end
-            }
-            for(int i=0; i<size; i++) {
-                DRPCInvocationsClient client;
-                synchronized (_clients) {
-                    client = _clients.get(i);
-                }
-                if (!client.isConnected()) {
-                    LOG.warn("DRPCInvocationsClient [{}:{}] is not 
connected.", client.getHost(), client.getPort());
-                    reconnect(client);
-                    continue;
-                }
-                try {
-                    DRPCRequest req = client.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
-                        returnInfo.put("id", req.get_request_id());
-                        returnInfo.put("host", client.getHost());
-                        returnInfo.put("port", client.getPort());
-                        gotRequest = true;
-                        _collector.emit(new Values(req.get_func_args(), 
JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 
i));
-                        break;
-                    }
-                } catch (AuthorizationException aze) {
-                    reconnect(client);
-                    LOG.error("Not authorized to fetch DRPC result from DRPC 
server", aze);
-                } catch (TException e) {
-                    reconnect(client);
-                    LOG.error("Failed to fetch DRPC result from DRPC server", 
e);
-                } catch (Exception e) {
-                    LOG.error("Failed to fetch DRPC result from DRPC server", 
e);
-                }
-            }
-            checkFutures();
-        } else {
-            DistributedRPCInvocations.Iface drpc = 
(DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
-            if(drpc!=null) { // can happen during shutdown of drpc while 
topology is still up
-                try {
-                    DRPCRequest req = drpc.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
-                        returnInfo.put("id", req.get_request_id());
-                        returnInfo.put("host", _local_drpc_id);
-                        returnInfo.put("port", 0);
-                        gotRequest = true;
-                        _collector.emit(new Values(req.get_func_args(), 
JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 
0));
-                    }
-                } catch (AuthorizationException aze) {
-                    throw new RuntimeException(aze);
-                } catch (TException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        if(!gotRequest) {
-            Utils.sleep(1);
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        DRPCMessageId did = (DRPCMessageId) msgId;
-        DistributedRPCInvocations.Iface client;
-        
-        if(_local_drpc_id == null) {
-            client = _clients.get(did.index);
-        } else {
-            client = (DistributedRPCInvocations.Iface) 
ServiceRegistry.getService(_local_drpc_id);
-        }
-        try {
-            client.failRequest(did.id);
-        } catch (AuthorizationException aze) {
-            LOG.error("Not authorized to failREquest from DRPC server", aze);
-        } catch (TException e) {
-            LOG.error("Failed to fail request", e);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("args", "return-info"));
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java 
b/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java
deleted file mode 100644
index f57bbb1..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class JoinResult extends BaseRichBolt {
-    public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);
-
-    String returnComponent;
-    Map<Object, Tuple> returns = new HashMap<>();
-    Map<Object, Tuple> results = new HashMap<>();
-    OutputCollector _collector;
-
-    public JoinResult(String returnComponent) {
-        this.returnComponent = returnComponent;
-    }
- 
-    public void prepare(Map map, TopologyContext context, OutputCollector 
collector) {
-        _collector = collector;
-    }
-
-    public void execute(Tuple tuple) {
-        Object requestId = tuple.getValue(0);
-        if(tuple.getSourceComponent().equals(returnComponent)) {
-            returns.put(requestId, tuple);
-        } else {
-            results.put(requestId, tuple);
-        }
-
-        if(returns.containsKey(requestId) && results.containsKey(requestId)) {
-            Tuple result = results.remove(requestId);
-            Tuple returner = returns.remove(requestId);
-            LOG.debug(result.getValue(1).toString());
-            List<Tuple> anchors = new ArrayList<>();
-            anchors.add(result);
-            anchors.add(returner);            
-            _collector.emit(anchors, new Values(""+result.getValue(1), 
returner.getValue(1)));
-            _collector.ack(result);
-            _collector.ack(returner);
-        }
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("result", "return-info"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java 
b/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java
deleted file mode 100644
index 39860e6..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicBoltExecutor;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.KeyedRoundRobinQueue;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class KeyedFairBolt implements IRichBolt, FinishedCallback {
-    IRichBolt _delegate;
-    KeyedRoundRobinQueue<Tuple> _rrQueue;
-    Thread _executor;
-    FinishedCallback _callback;
-
-    public KeyedFairBolt(IRichBolt delegate) {
-        _delegate = delegate;
-    }
-    
-    public KeyedFairBolt(IBasicBolt delegate) {
-        this(new BasicBoltExecutor(delegate));
-    }
-    
-    
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        if(_delegate instanceof FinishedCallback) {
-            _callback = (FinishedCallback) _delegate;
-        }
-        _delegate.prepare(stormConf, context, collector);
-        _rrQueue = new KeyedRoundRobinQueue<Tuple>();
-        _executor = new Thread(new Runnable() {
-            public void run() {
-                try {
-                    while(true) {
-                        _delegate.execute(_rrQueue.take());
-                    }
-                } catch (InterruptedException e) {
-
-                }
-            }
-        });
-        _executor.setDaemon(true);
-        _executor.start();
-    }
-
-    public void execute(Tuple input) {
-        Object key = input.getValue(0);
-        _rrQueue.add(key, input);
-    }
-
-    public void cleanup() {
-        _executor.interrupt();
-        _delegate.cleanup();
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _delegate.declareOutputFields(declarer);
-    }
-
-    public void finishedId(Object id) {
-        if(_callback!=null) {
-            _callback.finishedId(id);
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return new HashMap<String, Object>();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java 
b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
deleted file mode 100644
index 6f82f80..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.topology.ComponentConfigurationDeclarer;
-import org.apache.storm.tuple.Fields;
-
-public interface LinearDRPCInputDeclarer extends 
ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
-    public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
-    public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields 
fields);
-
-    public LinearDRPCInputDeclarer globalGrouping();
-    public LinearDRPCInputDeclarer globalGrouping(String streamId);
-
-    public LinearDRPCInputDeclarer shuffleGrouping();
-    public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
-
-    public LinearDRPCInputDeclarer localOrShuffleGrouping();
-    public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
-    
-    public LinearDRPCInputDeclarer noneGrouping();
-    public LinearDRPCInputDeclarer noneGrouping(String streamId);
-
-    public LinearDRPCInputDeclarer allGrouping();
-    public LinearDRPCInputDeclarer allGrouping(String streamId);
-
-    public LinearDRPCInputDeclarer directGrouping();
-    public LinearDRPCInputDeclarer directGrouping(String streamId);
-
-    public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
-    public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields 
fields);
-
-    public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping 
grouping);
-    public LinearDRPCInputDeclarer customGrouping(String streamId, 
CustomStreamGrouping grouping);
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java 
b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
deleted file mode 100644
index dc702a3..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.Constants;
-import org.apache.storm.ILocalDRPC;
-import org.apache.storm.coordination.BatchBoltExecutor;
-import org.apache.storm.coordination.CoordinatedBolt;
-import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
-import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec;
-import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
-import org.apache.storm.coordination.IBatchBolt;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.grouping.PartialKeyGrouping;
-import org.apache.storm.topology.BaseConfigurationDeclarer;
-import org.apache.storm.topology.BasicBoltExecutor;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.InputDeclarer;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-// Trident subsumes the functionality provided by this class, so it's 
deprecated
-@Deprecated
-public class LinearDRPCTopologyBuilder {    
-    String _function;
-    List<Component> _components = new ArrayList<Component>();
-    
-    
-    public LinearDRPCTopologyBuilder(String function) {
-        _function = function;
-    }
-        
-    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number 
parallelism) {
-        return addBolt(new BatchBoltExecutor(bolt), parallelism);
-    }
-    
-    public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
-        return addBolt(bolt, 1);
-    }
-    
-    @Deprecated
-    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) 
{
-        if(parallelism==null) parallelism = 1; 
-        Component component = new Component(bolt, parallelism.intValue());
-        _components.add(component);
-        return new InputDeclarerImpl(component);
-    }
-    
-    @Deprecated
-    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
-        return addBolt(bolt, null);
-    }
-    
-    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number 
parallelism) {
-        return addBolt(new BasicBoltExecutor(bolt), parallelism);
-    }
-
-    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
-        return addBolt(bolt, null);
-    }
-        
-    public StormTopology createLocalTopology(ILocalDRPC drpc) {
-        return createTopology(new DRPCSpout(_function, drpc));
-    }
-    
-    public StormTopology createRemoteTopology() {
-        return createTopology(new DRPCSpout(_function));
-    }
-    
-    
-    private StormTopology createTopology(DRPCSpout spout) {
-        final String SPOUT_ID = "spout";
-        final String PREPARE_ID = "prepare-request";
-        
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(SPOUT_ID, spout);
-        builder.setBolt(PREPARE_ID, new PrepareRequest())
-                .noneGrouping(SPOUT_ID);
-        int i=0;
-        for(; i<_components.size();i++) {
-            Component component = _components.get(i);
-            
-            Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
-            if (i==1) {
-                source.put(boltId(i-1), SourceArgs.single());
-            } else if (i>=2) {
-                source.put(boltId(i-1), SourceArgs.all());
-            }
-            IdStreamSpec idSpec = null;
-            if(i==_components.size()-1 && component.bolt instanceof 
FinishedCallback) {
-                idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, 
PrepareRequest.ID_STREAM);
-            }
-            BoltDeclarer declarer = builder.setBolt(
-                    boltId(i),
-                    new CoordinatedBolt(component.bolt, source, idSpec),
-                    component.parallelism);
-            
-            for(Map<String, Object> conf: component.componentConfs) {
-                declarer.addConfigurations(conf);
-            }
-            
-            if(idSpec!=null) {
-                
declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), 
PrepareRequest.ID_STREAM, new Fields("request"));
-            }
-            if(i==0 && component.declarations.isEmpty()) {
-                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
-            } else {
-                String prevId;
-                if(i==0) {
-                    prevId = PREPARE_ID;
-                } else {
-                    prevId = boltId(i-1);
-                }
-                for(InputDeclaration declaration: component.declarations) {
-                    declaration.declare(prevId, declarer);
-                }
-            }
-            if(i>0) {
-                declarer.directGrouping(boltId(i-1), 
Constants.COORDINATED_STREAM_ID); 
-            }
-        }
-        
-        IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
-        OutputFieldsGetter getter = new OutputFieldsGetter();
-        lastBolt.declareOutputFields(getter);
-        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
-        if(streams.size()!=1) {
-            throw new RuntimeException("Must declare exactly one stream from 
last bolt in LinearDRPCTopology");
-        }
-        String outputStream = streams.keySet().iterator().next();
-        List<String> fields = streams.get(outputStream).get_output_fields();
-        if(fields.size()!=2) {
-            throw new RuntimeException("Output stream of last component in 
LinearDRPCTopology must contain exactly two fields. The first should be the 
request id, and the second should be the result.");
-        }
-
-        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
-                .fieldsGrouping(boltId(i-1), outputStream, new 
Fields(fields.get(0)))
-                .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new 
Fields("request"));
-        i++;
-        builder.setBolt(boltId(i), new ReturnResults())
-                .noneGrouping(boltId(i-1));
-        return builder.createTopology();
-    }
-    
-    private static String boltId(int index) {
-        return "bolt" + index;
-    }
-    
-    private static class Component {
-        public IRichBolt bolt;
-        public int parallelism;
-        public List<Map<String, Object>> componentConfs = new ArrayList<>();
-        public List<InputDeclaration> declarations = new 
ArrayList<InputDeclaration>();
-        
-        public Component(IRichBolt bolt, int parallelism) {
-            this.bolt = bolt;
-            this.parallelism = parallelism;
-        }
-    }
-    
-    private static interface InputDeclaration {
-        public void declare(String prevComponent, InputDeclarer declarer);
-    }
-    
-    private static class InputDeclarerImpl extends 
BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements 
LinearDRPCInputDeclarer {
-        Component _component;
-        
-        public InputDeclarerImpl(Component component) {
-            _component = component;
-        }
-        
-        @Override
-        public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.fieldsGrouping(prevComponent, fields);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, 
final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.fieldsGrouping(prevComponent, streamId, fields);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer globalGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.globalGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer globalGrouping(final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.globalGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer shuffleGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.shuffleGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.shuffleGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer localOrShuffleGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.localOrShuffleGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer localOrShuffleGrouping(final String 
streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.localOrShuffleGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-        
-        @Override
-        public LinearDRPCInputDeclarer noneGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.noneGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer noneGrouping(final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.noneGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer allGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.allGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer allGrouping(final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.allGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer directGrouping() {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.directGrouping(prevComponent);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer directGrouping(final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.directGrouping(prevComponent, streamId);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) {
-            return customGrouping(new PartialKeyGrouping(fields));
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, 
Fields fields) {
-            return customGrouping(streamId, new PartialKeyGrouping(fields));
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer customGrouping(final 
CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.customGrouping(prevComponent, grouping);
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer customGrouping(final String streamId, 
final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(String prevComponent, InputDeclarer 
declarer) {
-                    declarer.customGrouping(prevComponent, streamId, grouping);
-                }                
-            });
-            return this;
-        }
-        
-        private void addDeclaration(InputDeclaration declaration) {
-            _component.declarations.add(declaration);
-        }
-
-        @Override
-        public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> 
conf) {
-            _component.componentConfs.add(conf);
-            return this;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java 
b/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java
deleted file mode 100644
index 15a3320..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/PrepareRequest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import java.util.Map;
-import java.util.Random;
-import org.apache.storm.utils.Utils;
-
-
-public class PrepareRequest extends BaseBasicBolt {
-    public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
-    public static final String RETURN_STREAM = "ret";
-    public static final String ID_STREAM = "id";
-
-    Random rand;
-
-    @Override
-    public void prepare(Map map, TopologyContext context) {
-        rand = new Random();
-    }
-
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-        String args = tuple.getString(0);
-        String returnInfo = tuple.getString(1);
-        long requestId = rand.nextLong();
-        collector.emit(ARGS_STREAM, new Values(requestId, args));
-        collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
-        collector.emit(ID_STREAM, new Values(requestId));
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(ARGS_STREAM, new Fields("request", "args"));
-        declarer.declareStream(RETURN_STREAM, new Fields("request", "return"));
-        declarer.declareStream(ID_STREAM, new Fields("request"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a6171bd6/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java 
b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
deleted file mode 100644
index a9a5aa1..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.drpc;
-
-import org.apache.storm.Config;
-import org.apache.storm.generated.DistributedRPCInvocations;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.ServiceRegistry;
-import org.apache.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.json.simple.JSONValue;
-
-
-public class ReturnResults extends BaseRichBolt {
-    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE 
PROBLEMS
-    static final long serialVersionUID = -774882142710631591L;
-
-    public static final Logger LOG = 
LoggerFactory.getLogger(ReturnResults.class);
-    OutputCollector _collector;
-    boolean local;
-    Map _conf; 
-    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, 
DRPCInvocationsClient>();
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        _conf = stormConf;
-        _collector = collector;
-        local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        String result = (String) input.getValue(0);
-        String returnInfo = (String) input.getValue(1);
-        if(returnInfo!=null) {
-            Map retMap = (Map) JSONValue.parse(returnInfo);
-            final String host = (String) retMap.get("host");
-            final int port = Utils.getInt(retMap.get("port"));
-            String id = (String) retMap.get("id");
-            DistributedRPCInvocations.Iface client;
-            if(local) {
-                client = (DistributedRPCInvocations.Iface) 
ServiceRegistry.getService(host);
-            } else {
-                List server = new ArrayList() {{
-                    add(host);
-                    add(port);
-                }};
-            
-                if(!_clients.containsKey(server)) {
-                    try {
-                        _clients.put(server, new DRPCInvocationsClient(_conf, 
host, port));
-                    } catch (TTransportException ex) {
-                        throw new RuntimeException(ex);
-                    }
-                }
-                client = _clients.get(server);
-            }
- 
-
-            int retryCnt = 0;
-            int maxRetries = 3;
-            while (retryCnt < maxRetries) {
-                retryCnt++;
-                try {
-                    client.result(id, result);
-                    _collector.ack(input);
-                    break;
-                } catch (AuthorizationException aze) {
-                    LOG.error("Not authorized to return results to DRPC 
server", aze);
-                    _collector.fail(input);
-                    throw new RuntimeException(aze);
-                } catch (TException tex) {
-                    if (retryCnt >= maxRetries) {
-                        LOG.error("Failed to return results to DRPC server", 
tex);
-                        _collector.fail(input);
-                    }
-                    reconnectClient((DRPCInvocationsClient) client);
-                }
-            }
-        }
-    }    
-
-    private void reconnectClient(DRPCInvocationsClient client) {
-        if (client instanceof DRPCInvocationsClient) {
-            try {
-                LOG.info("reconnecting... ");
-                client.reconnectClient(); //Blocking call
-            } catch (TException e2) {
-                LOG.error("Failed to connect to DRPC server", e2);
-            }
-        }
-    }
-    @Override
-    public void cleanup() {
-        for(DRPCInvocationsClient c: _clients.values()) {
-            c.close();
-        }
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-}

Reply via email to