This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 7972b77 GEODE-4029: Deployed jars may not be correct when multiple locators a… (#1103) 7972b77 is described below commit 7972b77d7790c63ee3f4d0a378a0efec38eaf372 Author: Jens Deppe <jde...@pivotal.io> AuthorDate: Mon Dec 4 11:13:27 2017 -0800 GEODE-4029: Deployed jars may not be correct when multiple locators a… (#1103) * GEODE-4029: Deployed jars may not be correct when multiple locators are in use * GEODE-4029: Review updates --- .../internal/ClusterConfigurationService.java | 82 ++++++++++++++++------ .../callbacks/ConfigurationChangeListener.java | 68 ++++++++++-------- .../configuration/functions/UploadJarFunction.java | 17 +++-- .../commands/DeployCommandRedeployDUnitTest.java | 26 +++++++ 4 files changed, 134 insertions(+), 59 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java index 16b86aa..beb6ae5 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java @@ -27,6 +27,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.file.Path; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -35,7 +36,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -319,9 +319,12 @@ public class ClusterConfigurationService { // will need the jars on file to upload to other locators. Need to update the jars // using a new copy of the Configuration so that the change listener will pick up the jar // name changes. + + String memberId = cache.getMyId().getId(); + Configuration configurationCopy = new Configuration(configuration); configurationCopy.addJarNames(jarNames); - configRegion.put(group, configurationCopy); + configRegion.put(group, configurationCopy, memberId); } } catch (Exception e) { success = false; @@ -352,6 +355,20 @@ public class ClusterConfigurationService { if (configuration == null) { break; } + + for (String jarRemoved : jarNames) { + File jar = this.getPathToJarOnThisLocator(group, jarRemoved).toFile(); + if (jar.exists()) { + try { + FileUtils.forceDelete(jar); + } catch (IOException e) { + logger.error( + "Exception occurred while attempting to delete a jar from the filesystem: {}", + jarRemoved, e); + } + } + } + Configuration configurationCopy = new Configuration(configuration); configurationCopy.removeJarNames(jarNames); configRegion.put(group, configurationCopy); @@ -382,27 +399,58 @@ public class ClusterConfigurationService { return FileUtils.readFileToByteArray(jar); } - // used in the cluster config change listener when jarnames are changed in the internal region + // Only used when a locator is initially starting up public void downloadJarFromOtherLocators(String groupName, String jarName) throws IllegalStateException, IOException { logger.info("Getting Jar files from other locators"); DM dm = this.cache.getDistributionManager(); DistributedMember me = this.cache.getMyId(); - Set<DistributedMember> locators = - new HashSet<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet()); + List<DistributedMember> locators = + new ArrayList<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet()); locators.remove(me); createConfigDirIfNecessary(groupName); - byte[] jarBytes = locators.stream() - .map((DistributedMember locator) -> downloadJarFromLocator(locator, groupName, jarName)) - .filter(Objects::nonNull).findFirst().orElseThrow(() -> new IllegalStateException( - "No locators have a deployed jar named " + jarName + " in " + groupName)); + if (locators.isEmpty()) { + throw new IllegalStateException( + "Request to download jar " + jarName + " but no other locators are present"); + } + + byte[] jarBytes = downloadJar(locators.get(0), groupName, jarName); File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile(); FileUtils.writeByteArrayToFile(jarToWrite, jarBytes); } + // used in the cluster config change listener when jarnames are changed in the internal region + public void downloadJarFromLocator(String groupName, String jarName, + DistributedMember sourceLocator) throws IllegalStateException, IOException { + logger.info("Downloading jar {} from locator {}", jarName, sourceLocator.getName()); + + createConfigDirIfNecessary(groupName); + + byte[] jarBytes = downloadJar(sourceLocator, groupName, jarName); + + if (jarBytes == null) { + throw new IllegalStateException("Could not download jar " + jarName + " in " + groupName + + " from " + sourceLocator.getName()); + } + + File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile(); + FileUtils.writeByteArrayToFile(jarToWrite, jarBytes); + } + + private byte[] downloadJar(DistributedMember locator, String groupName, String jarName) { + ResultCollector<byte[], List<byte[]>> rc = + (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new UploadJarFunction(), + new Object[] {groupName, jarName}, Collections.singleton(locator)); + + List<byte[]> result = rc.getResult(); + + // we should only get one byte[] back in the list + return result.get(0); + } + // used when creating cluster config response public Map<String, byte[]> getAllJarsFromThisLocator(Set<String> groups) throws IOException { Map<String, byte[]> jarNamesToJarBytes = new HashMap<>(); @@ -609,7 +657,9 @@ public class ClusterConfigurationService { } Region<String, Configuration> clusterRegion = getConfigurationRegion(); clusterRegion.clear(); - clusterRegion.putAll(sharedConfiguration); + + String memberId = cache.getMyId().getId(); + clusterRegion.putAll(sharedConfiguration, memberId); // Overwrite the security settings using the locator's properties, ignoring whatever // in the import @@ -656,18 +706,6 @@ public class ClusterConfigurationService { this.sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME); } - private byte[] downloadJarFromLocator(DistributedMember locator, String groupName, - String jarName) { - ResultCollector<byte[], List<byte[]>> rc = - (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new UploadJarFunction(), - new Object[] {groupName, jarName}, Collections.singleton(locator)); - - List<byte[]> result = rc.getResult(); - - // we should only get one byte[] back in the list - return result.stream().filter(Objects::nonNull).findFirst().orElse(null); - } - /** * Gets the region containing the shared configuration data. The region is created , if it does * not exist already. Note : this could block if this locator contains stale persistent diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java index c68664e..1c09334 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java @@ -17,14 +17,22 @@ package org.apache.geode.management.internal.configuration.callbacks; import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Logger; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.GemFireCache; import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterConfigurationService; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.logging.LogService; import org.apache.geode.management.internal.configuration.domain.Configuration; @@ -41,48 +49,37 @@ public class ConfigurationChangeListener extends CacheListenerAdapter<String, Co this.sharedConfig = sharedConfig; } + // Don't process the event locally. The action of adding or removing a jar should already have + // been performed by DeployCommand or UndeployCommand. @Override public void afterUpdate(EntryEvent<String, Configuration> event) { super.afterUpdate(event); - addOrRemoveJarFromFilesystem(event); + if (event.isOriginRemote()) { + addOrRemoveJarFromFilesystem(event); + } } @Override public void afterCreate(EntryEvent<String, Configuration> event) { super.afterCreate(event); - addOrRemoveJarFromFilesystem(event); + if (event.isOriginRemote()) { + addOrRemoveJarFromFilesystem(event); + } } - // when a new jar is added, if it does not exist in the current locator, download it from - // another locator. - // when a jar is removed, if it exists in the current locator, remove it. + // Here we first remove any jars which are not used anymore and then we re-add all of the + // necessary jars again. This may appear a bit blunt but it also accounts for the situation + // where a jar is only being updated - i.e. the name does not change, only the content. private void addOrRemoveJarFromFilesystem(EntryEvent<String, Configuration> event) { String group = event.getKey(); - Configuration newConfig = (Configuration) event.getNewValue(); - Configuration oldConfig = (Configuration) event.getOldValue(); + Configuration newConfig = event.getNewValue(); + Configuration oldConfig = event.getOldValue(); Set<String> newJars = newConfig.getJarNames(); Set<String> oldJars = (oldConfig == null) ? new HashSet<>() : oldConfig.getJarNames(); - Set<String> jarsAdded = new HashSet<>(newJars); - Set<String> jarsRemoved = new HashSet<>(oldJars); - jarsAdded.removeAll(oldJars); + Set<String> jarsRemoved = new HashSet<>(oldJars); jarsRemoved.removeAll(newJars); - if (!jarsAdded.isEmpty() && !jarsRemoved.isEmpty()) { - throw new IllegalStateException( - "We don't expect to have jars both added and removed in one event"); - } - - for (String jarAdded : jarsAdded) { - if (!jarExistsInFilesystem(group, jarAdded)) { - try { - sharedConfig.downloadJarFromOtherLocators(group, jarAdded); - } catch (Exception e) { - logger.error("Unable to add jar: " + jarAdded, e); - } - } - } - for (String jarRemoved : jarsRemoved) { File jar = sharedConfig.getPathToJarOnThisLocator(group, jarRemoved).toFile(); if (jar.exists()) { @@ -95,10 +92,25 @@ public class ConfigurationChangeListener extends CacheListenerAdapter<String, Co } } } - } - private boolean jarExistsInFilesystem(String groupName, String jarName) { - return sharedConfig.getPathToJarOnThisLocator(groupName, jarName).toFile().exists(); + String triggerMemberId = (String) event.getCallbackArgument(); + DistributedMember locator = getDistributedMember(triggerMemberId); + for (String jarAdded : newJars) { + try { + sharedConfig.downloadJarFromLocator(group, jarAdded, locator); + } catch (Exception e) { + logger.error("Unable to add jar: " + jarAdded, e); + } + } } + private DistributedMember getDistributedMember(String memberName) { + InternalCache cache = (InternalCache) CacheFactory.getAnyInstance(); + Set<DistributedMember> locators = new HashSet<>( + cache.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration().keySet()); + + Optional<DistributedMember> locator = + locators.stream().filter(x -> x.getId().equals(memberName)).findFirst(); + return locator.get(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java index f4fbbb3..56225ca 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java @@ -15,6 +15,8 @@ */ package org.apache.geode.management.internal.configuration.functions; +import java.util.List; + import org.apache.logging.log4j.Logger; import org.apache.geode.cache.execute.Function; @@ -25,34 +27,31 @@ import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.internal.InternalEntity; import org.apache.geode.internal.logging.LogService; -public class UploadJarFunction implements Function, InternalEntity { +public class UploadJarFunction implements Function<Object[]>, InternalEntity { private static final Logger logger = LogService.getLogger(); private static final long serialVersionUID = 1L; @Override - public void execute(FunctionContext context) { + public void execute(FunctionContext<Object[]> context) { InternalLocator locator = (InternalLocator) Locator.getLocator(); - Object[] args = (Object[]) context.getArguments(); + Object[] args = context.getArguments(); String group = (String) args[0]; String jarName = (String) args[1]; + byte[] jarBytes = null; if (locator != null && group != null && jarName != null) { ClusterConfigurationService sharedConfig = locator.getSharedConfiguration(); if (sharedConfig != null) { try { - byte[] jarBytes = sharedConfig.getJarBytesFromThisLocator(group, jarName); + jarBytes = sharedConfig.getJarBytesFromThisLocator(group, jarName); context.getResultSender().lastResult(jarBytes); - } catch (Exception e) { logger.error(e); - context.getResultSender().sendException(e); + throw new IllegalStateException(e.getMessage()); } } } - - // TODO: Why does this not throw an IllegalStateException? - context.getResultSender().lastResult(null); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java index 92e8c9d..4ce7323 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.Serializable; import java.net.URL; import java.util.List; +import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -119,6 +120,31 @@ public class DeployCommandRedeployDUnitTest { } @Test + public void redeployJarsWithNewVersionsOfFunctionsAndMultipleLocators() throws Exception { + Properties props = new Properties(); + props.setProperty("locators", "localhost[" + locator.getPort() + "]"); + MemberVM locator2 = lsRule.startLocatorVM(2, props); + + gfshConnector.executeAndAssertThat("deploy --jar=" + jarAVersion1.getCanonicalPath()) + .statusIsSuccess(); + server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A)); + server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1)); + + + gfshConnector.executeAndAssertThat("deploy --jar=" + jarAVersion2.getCanonicalPath()) + .statusIsSuccess(); + server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A)); + server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2)); + + server.stopMember(false); + + lsRule.startServerVM(1, locator.getPort()); + + server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A)); + server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2)); + } + + @Test public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() throws Exception { gfshConnector.executeAndAssertThat("deploy --jar=" + jarAVersion1.getCanonicalPath()) .statusIsSuccess(); -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].