[
https://issues.apache.org/jira/browse/STORM-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413041#comment-15413041
]
ASF GitHub Bot commented on STORM-2016:
---------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/1608#discussion_r74005317
--- Diff:
storm-core/src/jvm/org/apache/storm/dependency/DependencyUploader.java ---
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.dependency;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreUtils;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class DependencyUploader {
+ public static final Logger LOG =
LoggerFactory.getLogger(DependencyUploader.class);
+
+ private final Map<String, Object> conf;
+ private ClientBlobStore blobStore;
+
+ public DependencyUploader() {
+ conf = Utils.readStormConfig();
+ }
+
+ public void init() {
+ if (blobStore == null) {
+ blobStore = Utils.getClientBlobStore(conf);
+ }
+ }
+
+ public void shutdown() {
+ if (blobStore != null) {
+ blobStore.shutdown();
+ }
+ }
+
+ @VisibleForTesting
+ void setBlobStore(ClientBlobStore blobStore) {
+ this.blobStore = blobStore;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<String> uploadFiles(List<File> dependencies, boolean
cleanupIfFails) throws IOException, AuthorizationException {
+ checkFilesExist(dependencies);
+
+ List<String> keys = new ArrayList<>(dependencies.size());
+ try {
+ for (File dependency : dependencies) {
+ String fileName = dependency.getName();
+ String key =
BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));
+
+ try {
+ uploadDependencyToBlobStore(key, dependency);
+ } catch (KeyAlreadyExistsException e) {
+ // it should never happened since we apply UUID
+ throw new RuntimeException(e);
+ }
+
+ keys.add(key);
+ }
+ } catch (Throwable e) {
+ if (blobStore != null && cleanupIfFails) {
+ deleteBlobs(keys);
+ }
+ throw new RuntimeException(e);
+ }
+
+ return keys;
+ }
+
+ public List<String> uploadArtifacts(Map<String, File> artifacts) {
+ checkFilesExist(artifacts.values());
+
+ List<String> keys = new ArrayList<>(artifacts.size());
+ try {
+ for (Map.Entry<String, File> artifactToFile :
artifacts.entrySet()) {
+ String artifact = artifactToFile.getKey();
+ File dependency = artifactToFile.getValue();
+
+ String key =
BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
+ try {
+ uploadDependencyToBlobStore(key, dependency);
+ } catch (KeyAlreadyExistsException e) {
+ // we lose the race, but it doesn't matter
+ }
+
+ keys.add(key);
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ return keys;
+ }
+
+ private String convertArtifactToJarFileName(String artifact) {
+ return artifact.replace(":", "-") + ".jar";
+ }
+
+ private boolean uploadDependencyToBlobStore(String key, File
dependency)
+ throws KeyAlreadyExistsException, AuthorizationException,
IOException {
+
+ boolean uploadNew = false;
+ try {
+ // FIXME: we can filter by listKeys() with local blobstore
when STORM-1986 is going to be resolved
+ // as a workaround, we call getBlobMeta() for all keys
+ blobStore.getBlobMeta(key);
+ } catch (KeyNotFoundException e) {
+ // TODO: do we want to add ACL here?
--- End diff --
Before that, we need to check that nimbus and supervisor can access these
blobs. I didn't look deeply with blobstore so I would like someone to confirm
this.
@revans2 Could you guide this thing?
> Topology submission improvement: support adding local jars and maven
> artifacts on submission
> --------------------------------------------------------------------------------------------
>
> Key: STORM-2016
> URL: https://issues.apache.org/jira/browse/STORM-2016
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
>
> This JIRA tracks actual work on below proposal / design document.
> https://cwiki.apache.org/confluence/display/STORM/A.+Design+doc%3A+adding+jars+and+maven+artifacts+at+submission
> Proposal discussion thread is here:
> http://mail-archives.apache.org/mod_mbox/storm-dev/201608.mbox/%3ccaf5108i9+tjanz0lgrktmkvqel7f+53k9uyzxct6zhsu6oh...@mail.gmail.com%3E
> Let's post on discussion thread if we have any opinions / ideas on this
> instead of leaving comments on this issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)