http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 43c125d..90d68dc 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -24,15 +24,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.localizer.LocalResource; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.LocalState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +41,11 @@ public class SupervisorUtils { private static final SupervisorUtils INSTANCE = new SupervisorUtils(); private static SupervisorUtils _instance = INSTANCE; + public static void setInstance(SupervisorUtils u) { _instance = u; } + public static void resetInstance() { _instance = INSTANCE; } @@ -62,9 +63,9 @@ public class SupervisorUtils { } /** - * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then - * returns false - * + * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if + * it's not specified then returns false + * * @param blobInfo * @return */ @@ -73,8 +74,8 @@ public class SupervisorUtils { } /** - * Given the blob information returns the value of the workerRestart field, handling it either being a string or a boolean value, or - * if it's not specified then returns false. + * Given the blob information returns the value of the workerRestart field, handling it either being a string or a boolean value, or if + * it's not specified then returns false. * * @param blobInfo the info for the blob. * @return true if the blob needs a worker restart by way of the callback else false. @@ -85,7 +86,7 @@ public class SupervisorUtils { /** * Returns a list of LocalResources based on the blobstore-map passed in - * + * * @param blobstoreMap * @return */ @@ -94,7 +95,8 @@ public class SupervisorUtils { if (blobstoreMap != null) { for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { Map<String, Object> blobConf = map.getValue(); - LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(blobConf), blobNeedsWorkerRestart(blobConf)); + LocalResource localResource = + new LocalResource(map.getKey(), shouldUncompressBlob(blobConf), blobNeedsWorkerRestart(blobConf)); localResourceList.add(localResource); } } @@ -111,12 +113,30 @@ public class SupervisorUtils { * * @param conf * @return + * * @throws Exception */ public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception { return _instance.readWorkerHeartbeatsImpl(conf); } + /** + * get worker heartbeat by workerId + * + * @param conf + * @param workerId + * @return + * + * @throws IOException + */ + private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) { + return _instance.readWorkerHeartbeatImpl(conf, workerId); + } + + public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { + return _instance.isWorkerHbTimedOutImpl(now, whb, conf); + } + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception { Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); @@ -130,19 +150,6 @@ public class SupervisorUtils { return workerHeartbeats; } - - /** - * get worker heartbeat by workerId - * - * @param conf - * @param workerId - * @return - * @throws IOException - */ - private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) { - return _instance.readWorkerHeartbeatImpl(conf, workerId); - } - protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> conf, String workerId) { try { LocalState localState = ConfigUtils.workerState(conf, workerId); @@ -153,11 +160,7 @@ public class SupervisorUtils { } } - public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { - return _instance.isWorkerHbTimedOutImpl(now, whb, conf); - } - - private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { + private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { return (now - whb.get_time_secs()) > ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java index 9662af5..e531cb6 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; public interface UniFunc<T> { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java index c01819b..6e4814c 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.supervisor.timer; @@ -21,7 +15,6 @@ package org.apache.storm.daemon.supervisor.timer; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.daemon.supervisor.SupervisorUtils; import org.apache.storm.generated.LSWorkerHeartbeat; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index 9ffce91..f68cb8c 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@ -19,7 +19,6 @@ package org.apache.storm.daemon.supervisor.timer; import java.util.Map; - import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.healthcheck.HealthChecker; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index 14ecf94..24d4471 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.supervisor.timer; @@ -56,7 +50,7 @@ public class SupervisorHeartbeat implements Runnable { List<Long> usedPorts = new ArrayList<>(); usedPorts.addAll(supervisor.getCurrAssignment().get().keySet()); supervisorInfo.set_used_ports(usedPorts); - List metaDatas = (List)supervisor.getiSupervisor().getMetadata(); + List metaDatas = (List) supervisor.getiSupervisor().getMetadata(); List<Long> portList = new ArrayList<>(); if (metaDatas != null) { for (Object data : metaDatas) { @@ -85,8 +79,8 @@ public class SupervisorHeartbeat implements Runnable { // If configs are present in Generic map and legacy - the legacy values will be overwritten - Map<String, Number> rawResourcesMap = (Map<String,Number>) conf.getOrDefault( - Config.SUPERVISOR_RESOURCES_MAP, Collections.emptyMap() + Map<String, Number> rawResourcesMap = (Map<String, Number>) conf.getOrDefault( + Config.SUPERVISOR_RESOURCES_MAP, Collections.emptyMap() ); for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java index ba8d133..639c9db 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java @@ -1,26 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.supervisor.timer; import java.util.HashMap; import java.util.Map; - import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.daemon.supervisor.ReadClusterState; import org.apache.storm.daemon.supervisor.Supervisor; @@ -56,6 +49,18 @@ public class SynchronizeAssignments implements Runnable { this.readClusterState = readClusterState; } + private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) { + if (null == assignments) { + //unknown error, just skip + return; + } + Map<String, byte[]> serAssignments = new HashMap<>(); + for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) { + serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue())); + } + clusterState.syncRemoteAssignments(serAssignments); + } + @Override public void run() { // first sync assignments to local, then sync processes. @@ -118,16 +123,4 @@ public class SynchronizeAssignments implements Runnable { } } } - - private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) { - if (null == assignments) { - //unknown error, just skip - return; - } - Map<String, byte[]> serAssignments = new HashMap<>(); - for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) { - serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue())); - } - clusterState.syncRemoteAssignments(serAssignments); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/event/EventManager.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManager.java b/storm-server/src/main/java/org/apache/storm/event/EventManager.java index 64536c1..ff7342b 100644 --- a/storm-server/src/main/java/org/apache/storm/event/EventManager.java +++ b/storm-server/src/main/java/org/apache/storm/event/EventManager.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.event; public interface EventManager extends AutoCloseable { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java index 6b9d4f1..a527315 100644 --- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java +++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java @@ -1,32 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.event; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.event; import java.io.InterruptedIOException; import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventManagerImp implements EventManager { private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class); @@ -55,7 +49,7 @@ public class EventManagerImp implements EventManager { r.run(); proccessInc(); } catch (Throwable t) { - if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t) ) { + if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { LOG.info("Event manager interrupted while doing IO"); } else if (Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, t)) { LOG.info("Event manager interrupted while doing NIO"); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java index c373b05..83faf13 100644 --- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java +++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java @@ -18,12 +18,6 @@ package org.apache.storm.healthcheck; -import org.apache.storm.DaemonConfig; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.ServerConfigUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.File; import java.io.InputStream; @@ -32,6 +26,11 @@ import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.storm.DaemonConfig; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ServerConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HealthChecker { @@ -50,8 +49,9 @@ public class HealthChecker { if (parentFile.exists()) { File[] list = parentFile.listFiles(); for (File f : list) { - if (!f.isDirectory() && f.canExecute()) + if (!f.isDirectory() && f.canExecute()) { healthScripts.add(f.getAbsolutePath()); + } } } for (String script : healthScripts) { @@ -66,7 +66,7 @@ public class HealthChecker { // to execute properly, not that the system is unhealthy, in which case // we don't want to start killing things. - if (results.contains(FAILED)|| results.contains(FAILED_WITH_EXIT_CODE) + if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE) || results.contains(TIMEOUT)) { LOG.warn("The supervisor healthchecks failed!!!"); return 1; @@ -120,8 +120,9 @@ public class HealthChecker { LOG.warn("Script failed with exception: ", e); return FAILED_WITH_EXIT_CODE; } finally { - if (interruptThread != null) + if (interruptThread != null) { interruptThread.interrupt(); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java index 1cdcaad..8b41c68 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java @@ -29,7 +29,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -68,18 +67,19 @@ public class AsyncLocalizer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class); private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>(); + private static final int ATTEMPTS_INTERVAL_TIME = 100; static { ALL_DONE_FUTURE.complete(null); } - private final boolean isLocalMode; // track resources - user to resourceSet //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have // no side effects. protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userFiles = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userArchives = new ConcurrentHashMap<>(); + private final boolean isLocalMode; // topology to tracking of topology dir and resources private final ConcurrentHashMap<String, CompletableFuture<Void>> blobPending; private final Map<String, Object> conf; @@ -87,16 +87,14 @@ public class AsyncLocalizer implements AutoCloseable { private final boolean symlinksDisabled; private final ConcurrentHashMap<String, LocallyCachedBlob> topologyBlobs = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>(); - private final Path localBaseDir; private final int blobDownloadRetries; private final ScheduledExecutorService execService; - + private final long cacheCleanupPeriod; // cleanup @VisibleForTesting protected long cacheTargetSize; - private final long cacheCleanupPeriod; - + @VisibleForTesting AsyncLocalizer(Map<String, Object> conf, AdvancedFSOps ops, String baseDir) throws IOException { @@ -106,7 +104,7 @@ public class AsyncLocalizer implements AutoCloseable { localBaseDir = Paths.get(baseDir); // default cache size 10GB, converted to Bytes cacheTargetSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB), - 10 * 1024).longValue() << 20; + 10 * 1024).longValue() << 20; // default 30 seconds. (we cache the size so it is cheap to do) cacheCleanupPeriod = ObjectReader.getInt(conf.get( DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue(); @@ -117,10 +115,10 @@ public class AsyncLocalizer implements AutoCloseable { DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3); execService = Executors.newScheduledThreadPool(threadPoolSize, - new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build()); + new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build()); reconstructLocalizedResources(); - symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); + symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false); blobPending = new ConcurrentHashMap<>(); } @@ -131,40 +129,43 @@ public class AsyncLocalizer implements AutoCloseable { @VisibleForTesting LocallyCachedBlob getTopoJar(final String topologyId) { return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(topologyId), - (tjk) -> { - try { - return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + (tjk) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType + .TOPO_JAR); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @VisibleForTesting LocallyCachedBlob getTopoCode(final String topologyId) { return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(topologyId), - (tck) -> { - try { - return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + (tck) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType + .TOPO_CODE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @VisibleForTesting LocallyCachedBlob getTopoConf(final String topologyId) { return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(topologyId), - (tck) -> { - try { - return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, - LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + (tck) -> { + try { + return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps, + LocallyCachedTopologyBlob.TopologyBlobType + .TOPO_CONF); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private LocalizedResource getUserArchive(String user, String key) { @@ -181,33 +182,35 @@ public class AsyncLocalizer implements AutoCloseable { /** * Request that all of the blobs necessary for this topology be downloaded. + * * @param assignment the assignment that needs the blobs - * @param port the port the assignment is a part of - * @param cb a callback for when the blobs change. This is only for blobs that are tied to the lifetime of the worker. + * @param port the port the assignment is a part of + * @param cb a callback for when the blobs change. This is only for blobs that are tied to the lifetime of the worker. * @return a Future that indicates when they are all downloaded. + * * @throws IOException if there was an error while trying doing it. */ public CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port, - final BlobChangingCallback cb) throws IOException { + final BlobChangingCallback cb) throws IOException { final PortAndAssignment pna = new PortAndAssignment(port, assignment); final String topologyId = pna.getToplogyId(); CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(pna, cb); return baseBlobs.thenComposeAsync((v) -> - blobPending.compute(topologyId, (tid, old) -> { - CompletableFuture<Void> ret = old; - if (ret == null) { - ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService); - } else { - try { - addReferencesToBlobs(pna, cb); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - LOG.debug("Reserved blobs {} {}", topologyId, ret); - return ret; - })); + blobPending.compute(topologyId, (tid, old) -> { + CompletableFuture<Void> ret = old; + if (ret == null) { + ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService); + } else { + try { + addReferencesToBlobs(pna, cb); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + LOG.debug("Reserved blobs {} {}", topologyId, ret); + return ret; + })); } @VisibleForTesting @@ -225,19 +228,17 @@ public class AsyncLocalizer implements AutoCloseable { topoConf.addReference(pna, cb); return topologyBasicDownloaded.computeIfAbsent(topologyId, - (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf)); + (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf)); } - private static final int ATTEMPTS_INTERVAL_TIME = 100; - - private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... blobs) { + private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob... blobs) { return downloadOrUpdate(Arrays.asList(blobs)); } private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) { - CompletableFuture<Void> [] all = new CompletableFuture[blobs.size()]; + CompletableFuture<Void>[] all = new CompletableFuture[blobs.size()]; int i = 0; - for (final LocallyCachedBlob blob: blobs) { + for (final LocallyCachedBlob blob : blobs) { all[i] = CompletableFuture.runAsync(() -> { LOG.debug("STARTING download of {}", blob); try (ClientBlobStore blobStore = getClientBlobStore()) { @@ -296,7 +297,7 @@ public class AsyncLocalizer implements AutoCloseable { futures.add(downloadOrUpdate(map.values())); } } - for (CompletableFuture<?> f: futures) { + for (CompletableFuture<?> f : futures) { try { f.get(); } catch (Exception e) { @@ -312,8 +313,7 @@ public class AsyncLocalizer implements AutoCloseable { } /** - * Start any background threads needed. This includes updating blobs and cleaning up - * unused blobs over the configured size limit. + * Start any background threads needed. This includes updating blobs and cleaning up unused blobs over the configured size limit. */ public void start() { execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS); @@ -366,76 +366,17 @@ public class AsyncLocalizer implements AutoCloseable { } } - private class DownloadBlobs implements Supplier<Void> { - private final PortAndAssignment pna; - private final BlobChangingCallback cb; - - public DownloadBlobs(PortAndAssignment pna, BlobChangingCallback cb) { - this.pna = pna; - this.cb = cb; - } - - @Override - public Void get() { - try { - String topologyId = pna.getToplogyId(); - String topoOwner = pna.getOwner(); - String stormroot = ConfigUtils.supervisorStormDistRoot(conf, topologyId); - Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId); - - @SuppressWarnings("unchecked") - Map<String, Map<String, Object>> blobstoreMap = - (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - - List<LocalResource> localResourceList = getLocalResources(pna); - if (!localResourceList.isEmpty()) { - File userDir = getLocalUserFileCacheDir(topoOwner); - if (!fsOps.fileExists(userDir)) { - fsOps.forceMkdir(userDir); - } - List<LocalizedResource> localizedResources = getBlobs(localResourceList, pna, cb); - fsOps.setupBlobPermissions(userDir, topoOwner); - if (!symlinksDisabled) { - for (LocalizedResource localizedResource : localizedResources) { - String keyName = localizedResource.getKey(); - //The sym link we are pointing to - File rsrcFilePath = localizedResource.getCurrentSymlinkPath().toFile(); - - String symlinkName = null; - if (blobstoreMap != null) { - Map<String, Object> blobInfo = blobstoreMap.get(keyName); - if (blobInfo != null && blobInfo.containsKey("localname")) { - symlinkName = (String) blobInfo.get("localname"); - } else { - symlinkName = keyName; - } - } else { - // all things are from dependencies - symlinkName = keyName; - } - fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); - } - } - } - - return null; - } catch (Exception e) { - LOG.warn("Caught Exception While Downloading (rethrowing)... ", e); - throw new RuntimeException(e); - } - } - } - /** * Do everything needed to recover the state in the AsyncLocalizer for a running topology. + * * @param currentAssignment the assignment for the topology. - * @param port the port the assignment is on. - * @param cb a callback for when the blobs are updated. This will only be for blobs that - * indicate that if they change the worker should be restarted. + * @param port the port the assignment is on. + * @param cb a callback for when the blobs are updated. This will only be for blobs that indicate that if they change + * the worker should be restarted. * @throws IOException on any error trying to recover the state. */ public void recoverRunningTopology(final LocalAssignment currentAssignment, final int port, - final BlobChangingCallback cb) throws IOException { + final BlobChangingCallback cb) throws IOException { final PortAndAssignment pna = new PortAndAssignment(port, currentAssignment); final String topologyId = pna.getToplogyId(); @@ -463,7 +404,7 @@ public class AsyncLocalizer implements AutoCloseable { * Remove this assignment/port as blocking resources from being cleaned up. * * @param assignment the assignment the resources are for - * @param port the port the topology is running on + * @param port the port the topology is running on * @throws IOException on any error */ public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException { @@ -512,13 +453,13 @@ public class AsyncLocalizer implements AutoCloseable { } private void recoverLocalizedArchivesForUser(String user) throws IOException { - for (String key: LocalizedResource.getLocalizedArchiveKeys(localBaseDir, user)) { + for (String key : LocalizedResource.getLocalizedArchiveKeys(localBaseDir, user)) { getUserArchive(user, key); } } private void recoverLocalizedFilesForUser(String user) throws IOException { - for (String key: LocalizedResource.getLocalizedFileKeys(localBaseDir, user)) { + for (String key : LocalizedResource.getLocalizedFileKeys(localBaseDir, user)) { getUserFile(user, key); } } @@ -544,7 +485,7 @@ public class AsyncLocalizer implements AutoCloseable { // ignores invalid user/topo/key void removeBlobReference(String key, PortAndAssignment pna, - boolean uncompress) throws AuthorizationException, KeyNotFoundException { + boolean uncompress) throws AuthorizationException, KeyNotFoundException { String user = pna.getOwner(); String topo = pna.getToplogyId(); ConcurrentMap<String, LocalizedResource> lrsrcSet = uncompress ? userArchives.get(user) : userFiles.get(user); @@ -555,11 +496,11 @@ public class AsyncLocalizer implements AutoCloseable { lrsrc.removeReference(pna); } else { LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user - + " topo: " + topo); + + " topo: " + topo); } } else { LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: " - + key + " topo: " + topo); + + key + " topo: " + topo); } } @@ -568,9 +509,8 @@ public class AsyncLocalizer implements AutoCloseable { } /** - * This function either returns the blobs in the existing cache or if they don't exist in the - * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) - * and will block until all of them have been downloaded. + * This function either returns the blobs in the existing cache or if they don't exist in the cache, it downloads them in parallel (up + * to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) and will block until all of them have been downloaded. */ List<LocalizedResource> getBlobs(List<LocalResource> localResources, PortAndAssignment pna, BlobChangingCallback cb) throws AuthorizationException, KeyNotFoundException, IOException { @@ -582,7 +522,7 @@ public class AsyncLocalizer implements AutoCloseable { List<CompletableFuture<?>> futures = new ArrayList<>(); try { - for (LocalResource localResource: localResources) { + for (LocalResource localResource : localResources) { String key = localResource.getBlobName(); boolean uncompress = localResource.shouldUncompress(); LocalizedResource lrsrc = uncompress ? getUserArchive(user, key) : getUserFile(user, key); @@ -595,7 +535,7 @@ public class AsyncLocalizer implements AutoCloseable { lrsrc.addReference(pna, localResource.needsCallback() ? cb : null); } - for (CompletableFuture<?> futureRsrc: futures) { + for (CompletableFuture<?> futureRsrc : futures) { futureRsrc.get(); } } catch (ExecutionException e) { @@ -610,10 +550,6 @@ public class AsyncLocalizer implements AutoCloseable { return results; } - private interface ConsumePathAndId { - void accept(Path path, String topologyId) throws IOException; - } - private void forEachTopologyDistDir(ConsumePathAndId consumer) throws IOException { Path stormCodeRoot = Paths.get(ConfigUtils.supervisorStormDistRoot(conf)); if (Files.exists(stormCodeRoot) && Files.isDirectory(stormCodeRoot)) { @@ -669,7 +605,7 @@ public class AsyncLocalizer implements AutoCloseable { LOG.debug("Resource cleanup: {}", toClean); Set<String> allUsers = new HashSet<>(userArchives.keySet()); allUsers.addAll(userFiles.keySet()); - for (String user: allUsers) { + for (String user : allUsers) { ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user); ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user); if ((filesForUser == null || filesForUser.size() == 0) @@ -686,4 +622,68 @@ public class AsyncLocalizer implements AutoCloseable { } } } + + private interface ConsumePathAndId { + void accept(Path path, String topologyId) throws IOException; + } + + private class DownloadBlobs implements Supplier<Void> { + private final PortAndAssignment pna; + private final BlobChangingCallback cb; + + public DownloadBlobs(PortAndAssignment pna, BlobChangingCallback cb) { + this.pna = pna; + this.cb = cb; + } + + @Override + public Void get() { + try { + String topologyId = pna.getToplogyId(); + String topoOwner = pna.getOwner(); + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, topologyId); + Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId); + + @SuppressWarnings("unchecked") + Map<String, Map<String, Object>> blobstoreMap = + (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + + List<LocalResource> localResourceList = getLocalResources(pna); + if (!localResourceList.isEmpty()) { + File userDir = getLocalUserFileCacheDir(topoOwner); + if (!fsOps.fileExists(userDir)) { + fsOps.forceMkdir(userDir); + } + List<LocalizedResource> localizedResources = getBlobs(localResourceList, pna, cb); + fsOps.setupBlobPermissions(userDir, topoOwner); + if (!symlinksDisabled) { + for (LocalizedResource localizedResource : localizedResources) { + String keyName = localizedResource.getKey(); + //The sym link we are pointing to + File rsrcFilePath = localizedResource.getCurrentSymlinkPath().toFile(); + + String symlinkName = null; + if (blobstoreMap != null) { + Map<String, Object> blobInfo = blobstoreMap.get(keyName); + if (blobInfo != null && blobInfo.containsKey("localname")) { + symlinkName = (String) blobInfo.get("localname"); + } else { + symlinkName = keyName; + } + } else { + // all things are from dependencies + symlinkName = keyName; + } + fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath); + } + } + } + + return null; + } catch (Exception e) { + LOG.warn("Caught Exception While Downloading (rethrowing)... ", e); + throw new RuntimeException(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java b/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java index e99f95b..ebbbdc5 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java @@ -26,15 +26,15 @@ import org.apache.storm.generated.LocalAssignment; public interface BlobChangingCallback { /** - * Informs the listener that a blob has changed and is ready to update and replace a localized blob that has been marked as - * tied to the life cycle of the worker process. + * Informs the listener that a blob has changed and is ready to update and replace a localized blob that has been marked as tied to the + * life cycle of the worker process. + * + * If `go.getLatch()` is never called before the method completes it is assumed that the listener is good with the blob changing. * - * If `go.getLatch()` is never called before the method completes it is assumed that - * the listener is good with the blob changing. * @param assignment the assignment this resource and callback are registered with. - * @param port the port that this resource and callback are registered with. - * @param blob the blob that is going to change. - * @param go a way to indicate if the listener is ready for the resource to change. + * @param port the port that this resource and callback are registered with. + * @param blob the blob that is going to change. + * @param go a way to indicate if the listener is ready for the resource to change. */ void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob blob, GoodToGo go); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java b/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java index 2fffe21..9217388 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java @@ -22,39 +22,20 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** - * Used as a way to give feedback that the listener is ready for the caller to change the blob. - * By calling @{link GoodToGo#getLatch()} the listener indicates that it wants to block - * changing the blob until the CountDownLatch is triggered with a call to @{link CountDownLatch#countDown()}. + * Used as a way to give feedback that the listener is ready for the caller to change the blob. By calling @{link GoodToGo#getLatch()} the + * listener indicates that it wants to block changing the blob until the CountDownLatch is triggered with a call to @{link + * CountDownLatch#countDown()}. */ public class GoodToGo { - public static class GoodToGoLatch { - private final CountDownLatch latch; - private final Future<Void> doneChanging; - private boolean wasCounted = false; - - public GoodToGoLatch(CountDownLatch latch, Future<Void> doneChanging) { - this.latch = latch; - this.doneChanging = doneChanging; - } - - public synchronized Future<Void> countDown() { - if (!wasCounted) { - latch.countDown(); - wasCounted = true; - } - return doneChanging; - } - } - private final GoodToGoLatch latch; private boolean gotLatch = false; - public GoodToGo(CountDownLatch latch, Future<Void> doneChanging) { this.latch = new GoodToGoLatch(latch, doneChanging); } /** * Get the latch and indicate that you want to block the blob being changed. + * * @return the latch to use when you are ready. */ public synchronized GoodToGoLatch getLatch() { @@ -67,4 +48,23 @@ public class GoodToGo { latch.countDown(); } } + + public static class GoodToGoLatch { + private final CountDownLatch latch; + private final Future<Void> doneChanging; + private boolean wasCounted = false; + + public GoodToGoLatch(CountDownLatch latch, Future<Void> doneChanging) { + this.latch = latch; + this.doneChanging = doneChanging; + } + + public synchronized Future<Void> countDown() { + if (!wasCounted) { + latch.countDown(); + wasCounted = true; + } + return doneChanging; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java index 862349b..f5dfc77 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java index 75a2d6d..6e0061d 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java @@ -18,8 +18,6 @@ package org.apache.storm.localizer; -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; - import com.google.common.annotations.VisibleForTesting; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -57,13 +55,13 @@ import org.apache.storm.utils.ShellUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; + /** - * Represents a resource that is localized on the supervisor. - * A localized resource has a .current symlink to the current version file which is named - * filename.{current version}. There is also a filename.version which contains the latest version. + * Represents a resource that is localized on the supervisor. A localized resource has a .current symlink to the current version file which + * is named filename.{current version}. There is also a filename.version which contains the latest version. */ public class LocalizedResource extends LocallyCachedBlob { - private static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class); @VisibleForTesting static final String CURRENT_BLOB_SUFFIX = ".current"; @VisibleForTesting @@ -77,7 +75,34 @@ public class LocalizedResource extends LocallyCachedBlob { static final String FILESDIR = "files"; @VisibleForTesting static final String ARCHIVESDIR = "archives"; + private static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class); private static final String TO_UNCOMPRESS = "_tmp_"; + private static final Pattern VERSION_FILE_PATTERN = Pattern.compile("^(.+)\\.(\\d+)$"); + // filesystem path to the resource + private final Path baseDir; + private final Path versionFilePath; + private final Path symlinkPath; + private final boolean uncompressed; + private final IAdvancedFSOps fsOps; + private final String user; + private final Map<String, Object> conf; + // size of the resource + private long size = -1; + + LocalizedResource(String key, Path localBaseDir, boolean uncompressed, IAdvancedFSOps fsOps, Map<String, Object> conf, + String user) { + super(key + (uncompressed ? " archive" : " file"), key); + Path base = getLocalUserFileCacheDir(localBaseDir, user); + this.baseDir = uncompressed ? getCacheDirForArchives(base) : getCacheDirForFiles(base); + this.conf = conf; + this.user = user; + this.fsOps = fsOps; + versionFilePath = constructVersionFileName(baseDir, key); + symlinkPath = constructBlobCurrentSymlinkName(baseDir, key); + this.uncompressed = uncompressed; + //Set the size in case we are recovering an already downloaded object + setSize(); + } private static Path constructVersionFileName(Path baseDir, String key) { return baseDir.resolve(key + BLOB_VERSION_SUFFIX); @@ -141,16 +166,16 @@ public class LocalizedResource extends LocallyCachedBlob { return Collections.emptyList(); } return Files.list(dir) - .map((p) -> p.getFileName().toString()) - .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX)) - .map((key) -> { - int p = key.lastIndexOf('.'); - if (p > 0) { - key = key.substring(0, p); - } - return key; - }) - .collect(Collectors.toList()); + .map((p) -> p.getFileName().toString()) + .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX)) + .map((key) -> { + int p = key.lastIndexOf('.'); + if (p > 0) { + key = key.substring(0, p); + } + return key; + }) + .collect(Collectors.toList()); } // baseDir/supervisor/usercache/ @@ -179,32 +204,6 @@ public class LocalizedResource extends LocallyCachedBlob { return dir.resolve(ARCHIVESDIR); } - // filesystem path to the resource - private final Path baseDir; - private final Path versionFilePath; - private final Path symlinkPath; - private final boolean uncompressed; - private final IAdvancedFSOps fsOps; - private final String user; - // size of the resource - private long size = -1; - private final Map<String, Object> conf; - - LocalizedResource(String key, Path localBaseDir, boolean uncompressed, IAdvancedFSOps fsOps, Map<String, Object> conf, - String user) { - super(key + (uncompressed ? " archive" : " file"), key); - Path base = getLocalUserFileCacheDir(localBaseDir, user); - this.baseDir = uncompressed ? getCacheDirForArchives(base) : getCacheDirForFiles(base); - this.conf = conf; - this.user = user; - this.fsOps = fsOps; - versionFilePath = constructVersionFileName(baseDir, key); - symlinkPath = constructBlobCurrentSymlinkName(baseDir, key); - this.uncompressed = uncompressed; - //Set the size in case we are recovering an already downloaded object - setSize(); - } - Path getCurrentSymlinkPath() { return symlinkPath; } @@ -331,7 +330,7 @@ public class LocalizedResource extends LocallyCachedBlob { LOG.warn("Exit code from worker-launcher is: {}", exitCode, e); LOG.debug("output: {}", shExec.getOutput()); throw new IOException("Setting blob permissions failed" - + " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); + + " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); } } @@ -343,8 +342,6 @@ public class LocalizedResource extends LocallyCachedBlob { return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey() + CURRENT_BLOB_SUFFIX)); } - private static final Pattern VERSION_FILE_PATTERN = Pattern.compile("^(.+)\\.(\\d+)$"); - @Override public void cleanupOrphanedData() throws IOException { //There are a few possible files that we would want to clean up @@ -434,14 +431,14 @@ public class LocalizedResource extends LocallyCachedBlob { @Override public boolean isFullyDownloaded() { return Files.exists(getFilePathWithVersion()) - && Files.exists(getCurrentSymlinkPath()) - && Files.exists(versionFilePath); + && Files.exists(getCurrentSymlinkPath()) + && Files.exists(versionFilePath); } @Override public boolean equals(Object other) { if (other instanceof LocalizedResource) { - LocalizedResource l = (LocalizedResource)other; + LocalizedResource l = (LocalizedResource) other; return getKey().equals(l.getKey()) && uncompressed == l.uncompressed && baseDir.equals(l.baseDir); } return false; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java index 936dbc1..15eba02 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; @@ -37,11 +31,11 @@ import org.slf4j.LoggerFactory; */ public class LocalizedResourceRetentionSet { public static final Logger LOG = LoggerFactory.getLogger(LocalizedResourceRetentionSet.class); + @VisibleForTesting + final SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> noReferences; private long currentSize; // targetSize in Bytes private long targetSize; - @VisibleForTesting - final SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> noReferences; LocalizedResourceRetentionSet(long targetSize) { this(targetSize, new LRUComparator()); @@ -68,7 +62,7 @@ public class LocalizedResourceRetentionSet { * if they are deleted on disk too. */ public void addResources(ConcurrentMap<String, ? extends LocallyCachedBlob> blobs) { - for (LocallyCachedBlob b: blobs.values()) { + for (LocallyCachedBlob b : blobs.values()) { currentSize += b.getSizeOnDisk(); if (b.isUsed()) { LOG.debug("NOT going to clean up {}, {} depends on it", b.getKey(), b.getDependencies()); @@ -90,7 +84,7 @@ public class LocalizedResourceRetentionSet { long bytesOver = currentSize - targetSize; //First delete everything that no longer exists... for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator(); - i.hasNext();) { + i.hasNext(); ) { Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next(); LocallyCachedBlob resource = rsrc.getKey(); try { @@ -109,7 +103,7 @@ public class LocalizedResourceRetentionSet { } for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator(); - bytesOver > 0 && i.hasNext();) { + bytesOver > 0 && i.hasNext(); ) { Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next(); LocallyCachedBlob resource = rsrc.getKey(); Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java index 1cbb221..d112ee5 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; @@ -43,16 +37,15 @@ import org.slf4j.LoggerFactory; * Represents a blob that is cached locally on disk by the supervisor. */ public abstract class LocallyCachedBlob { - private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class); public static final long NOT_DOWNLOADED_VERSION = -1; + private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class); // A callback that does nothing. private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> { }; - - private long lastUsed = Time.currentTimeMillis(); private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>(); private final String blobDescription; private final String blobKey; + private long lastUsed = Time.currentTimeMillis(); private CompletableFuture<Void> doneUpdating = null; /** @@ -66,28 +59,8 @@ public abstract class LocallyCachedBlob { this.blobKey = blobKey; } - /** - * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION should be - * returned. PRECONDITION: this can only be called with a lock on this instance held. - */ - public abstract long getLocalVersion(); - - /** - * Get the version of the blob in the blob store. PRECONDITION: this can only be called with a lock on this instance held. - */ - public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException; - - /** - * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location. - * PRECONDITION: this can only be called with a lock on this instance held. - * - * @param store the store to us to download the data. - * @return the version that was downloaded. - */ - public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException; - protected static long downloadToTempLocation(ClientBlobStore store, String key, long currentVersion, IAdvancedFSOps fsOps, - Function<Long, Path> getTempPath) + Function<Long, Path> getTempPath) throws KeyNotFoundException, AuthorizationException, IOException { try (InputStreamWithMeta in = store.getBlob(key)) { long newVersion = in.getVersion(); @@ -116,6 +89,51 @@ public abstract class LocallyCachedBlob { } /** + * Get the size of p in bytes. + * @param p the path to read. + * @return the size of p in bytes. + */ + protected static long getSizeOnDisk(Path p) throws IOException { + if (!Files.exists(p)) { + return 0; + } else if (Files.isRegularFile(p)) { + return Files.size(p); + } else { + //We will not follow sym links + return Files.walk(p) + .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS)) + .mapToLong((subp) -> { + try { + return Files.size(subp); + } catch (IOException e) { + LOG.warn("Could not get the size of "); + } + return 0; + }).sum(); + } + } + + /** + * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION should be + * returned. PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract long getLocalVersion(); + + /** + * Get the version of the blob in the blob store. PRECONDITION: this can only be called with a lock on this instance held. + */ + public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException; + + /** + * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location. + * PRECONDITION: this can only be called with a lock on this instance held. + * + * @param store the store to us to download the data. + * @return the version that was downloaded. + */ + public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException; + + /** * Commit the new version and make it available for the end user. * PRECONDITION: uncompressToTempLocationIfNeeded will have been called. * PRECONDITION: this can only be called with a lock on this instance held. @@ -145,31 +163,6 @@ public abstract class LocallyCachedBlob { public abstract long getSizeOnDisk(); /** - * Get the size of p in bytes. - * @param p the path to read. - * @return the size of p in bytes. - */ - protected static long getSizeOnDisk(Path p) throws IOException { - if (!Files.exists(p)) { - return 0; - } else if (Files.isRegularFile(p)) { - return Files.size(p); - } else { - //We will not follow sym links - return Files.walk(p) - .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS)) - .mapToLong((subp) -> { - try { - return Files.size(subp); - } catch (IOException e) { - LOG.warn("Could not get the size of "); - } - return 0; - }).sum(); - } - } - - /** * Updates the last updated time. This should be called when references are added or removed. */ protected synchronized void touch() {
