[
https://issues.apache.org/jira/browse/GEODE-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16277291#comment-16277291
]
ASF GitHub Bot commented on GEODE-4029:
---------------------------------------
jdeppe-pivotal closed pull request #1103: GEODE-4029: Deployed jars may not be
correct when multiple locators a…
URL: https://github.com/apache/geode/pull/1103
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 16b86aadc1..beb6ae5ddc 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -27,6 +27,7 @@
import java.io.StringWriter;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@@ -35,7 +36,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -319,9 +319,12 @@ public boolean addJarsToThisLocator(String[] jarNames,
byte[][] jarBytes, String
// will need the jars on file to upload to other locators. Need to
update the jars
// using a new copy of the Configuration so that the change listener
will pick up the jar
// name changes.
+
+ String memberId = cache.getMyId().getId();
+
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.addJarNames(jarNames);
- configRegion.put(group, configurationCopy);
+ configRegion.put(group, configurationCopy, memberId);
}
} catch (Exception e) {
success = false;
@@ -352,6 +355,20 @@ public boolean removeJars(final String[] jarNames,
String[] groups) {
if (configuration == null) {
break;
}
+
+ for (String jarRemoved : jarNames) {
+ File jar = this.getPathToJarOnThisLocator(group,
jarRemoved).toFile();
+ if (jar.exists()) {
+ try {
+ FileUtils.forceDelete(jar);
+ } catch (IOException e) {
+ logger.error(
+ "Exception occurred while attempting to delete a jar from
the filesystem: {}",
+ jarRemoved, e);
+ }
+ }
+ }
+
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.removeJarNames(jarNames);
configRegion.put(group, configurationCopy);
@@ -382,27 +399,58 @@ public boolean removeJars(final String[] jarNames,
String[] groups) {
return FileUtils.readFileToByteArray(jar);
}
- // used in the cluster config change listener when jarnames are changed in
the internal region
+ // Only used when a locator is initially starting up
public void downloadJarFromOtherLocators(String groupName, String jarName)
throws IllegalStateException, IOException {
logger.info("Getting Jar files from other locators");
DM dm = this.cache.getDistributionManager();
DistributedMember me = this.cache.getMyId();
- Set<DistributedMember> locators =
- new
HashSet<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
+ List<DistributedMember> locators =
+ new
ArrayList<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
locators.remove(me);
createConfigDirIfNecessary(groupName);
- byte[] jarBytes = locators.stream()
- .map((DistributedMember locator) -> downloadJarFromLocator(locator,
groupName, jarName))
- .filter(Objects::nonNull).findFirst().orElseThrow(() -> new
IllegalStateException(
- "No locators have a deployed jar named " + jarName + " in " +
groupName));
+ if (locators.isEmpty()) {
+ throw new IllegalStateException(
+ "Request to download jar " + jarName + " but no other locators are
present");
+ }
+
+ byte[] jarBytes = downloadJar(locators.get(0), groupName, jarName);
File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);
}
+ // used in the cluster config change listener when jarnames are changed in
the internal region
+ public void downloadJarFromLocator(String groupName, String jarName,
+ DistributedMember sourceLocator) throws IllegalStateException,
IOException {
+ logger.info("Downloading jar {} from locator {}", jarName,
sourceLocator.getName());
+
+ createConfigDirIfNecessary(groupName);
+
+ byte[] jarBytes = downloadJar(sourceLocator, groupName, jarName);
+
+ if (jarBytes == null) {
+ throw new IllegalStateException("Could not download jar " + jarName + "
in " + groupName
+ + " from " + sourceLocator.getName());
+ }
+
+ File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
+ FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);
+ }
+
+ private byte[] downloadJar(DistributedMember locator, String groupName,
String jarName) {
+ ResultCollector<byte[], List<byte[]>> rc =
+ (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new
UploadJarFunction(),
+ new Object[] {groupName, jarName}, Collections.singleton(locator));
+
+ List<byte[]> result = rc.getResult();
+
+ // we should only get one byte[] back in the list
+ return result.get(0);
+ }
+
// used when creating cluster config response
public Map<String, byte[]> getAllJarsFromThisLocator(Set<String> groups)
throws IOException {
Map<String, byte[]> jarNamesToJarBytes = new HashMap<>();
@@ -609,7 +657,9 @@ public void loadSharedConfigurationFromDisk()
}
Region<String, Configuration> clusterRegion = getConfigurationRegion();
clusterRegion.clear();
- clusterRegion.putAll(sharedConfiguration);
+
+ String memberId = cache.getMyId().getId();
+ clusterRegion.putAll(sharedConfiguration, memberId);
// Overwrite the security settings using the locator's properties,
ignoring whatever
// in the import
@@ -656,18 +706,6 @@ public void unlockSharedConfiguration() {
this.sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
}
- private byte[] downloadJarFromLocator(DistributedMember locator, String
groupName,
- String jarName) {
- ResultCollector<byte[], List<byte[]>> rc =
- (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new
UploadJarFunction(),
- new Object[] {groupName, jarName}, Collections.singleton(locator));
-
- List<byte[]> result = rc.getResult();
-
- // we should only get one byte[] back in the list
- return result.stream().filter(Objects::nonNull).findFirst().orElse(null);
- }
-
/**
* Gets the region containing the shared configuration data. The region is
created , if it does
* not exist already. Note : this could block if this locator contains stale
persistent
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
index c68664e568..1c09334369 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
@@ -17,14 +17,22 @@
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterConfigurationService;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.configuration.domain.Configuration;
@@ -41,48 +49,37 @@ public
ConfigurationChangeListener(ClusterConfigurationService sharedConfig) {
this.sharedConfig = sharedConfig;
}
+ // Don't process the event locally. The action of adding or removing a jar
should already have
+ // been performed by DeployCommand or UndeployCommand.
@Override
public void afterUpdate(EntryEvent<String, Configuration> event) {
super.afterUpdate(event);
- addOrRemoveJarFromFilesystem(event);
+ if (event.isOriginRemote()) {
+ addOrRemoveJarFromFilesystem(event);
+ }
}
@Override
public void afterCreate(EntryEvent<String, Configuration> event) {
super.afterCreate(event);
- addOrRemoveJarFromFilesystem(event);
+ if (event.isOriginRemote()) {
+ addOrRemoveJarFromFilesystem(event);
+ }
}
- // when a new jar is added, if it does not exist in the current locator,
download it from
- // another locator.
- // when a jar is removed, if it exists in the current locator, remove it.
+ // Here we first remove any jars which are not used anymore and then we
re-add all of the
+ // necessary jars again. This may appear a bit blunt but it also accounts
for the situation
+ // where a jar is only being updated - i.e. the name does not change, only
the content.
private void addOrRemoveJarFromFilesystem(EntryEvent<String, Configuration>
event) {
String group = event.getKey();
- Configuration newConfig = (Configuration) event.getNewValue();
- Configuration oldConfig = (Configuration) event.getOldValue();
+ Configuration newConfig = event.getNewValue();
+ Configuration oldConfig = event.getOldValue();
Set<String> newJars = newConfig.getJarNames();
Set<String> oldJars = (oldConfig == null) ? new HashSet<>() :
oldConfig.getJarNames();
- Set<String> jarsAdded = new HashSet<>(newJars);
- Set<String> jarsRemoved = new HashSet<>(oldJars);
- jarsAdded.removeAll(oldJars);
+ Set<String> jarsRemoved = new HashSet<>(oldJars);
jarsRemoved.removeAll(newJars);
- if (!jarsAdded.isEmpty() && !jarsRemoved.isEmpty()) {
- throw new IllegalStateException(
- "We don't expect to have jars both added and removed in one event");
- }
-
- for (String jarAdded : jarsAdded) {
- if (!jarExistsInFilesystem(group, jarAdded)) {
- try {
- sharedConfig.downloadJarFromOtherLocators(group, jarAdded);
- } catch (Exception e) {
- logger.error("Unable to add jar: " + jarAdded, e);
- }
- }
- }
-
for (String jarRemoved : jarsRemoved) {
File jar = sharedConfig.getPathToJarOnThisLocator(group,
jarRemoved).toFile();
if (jar.exists()) {
@@ -95,10 +92,25 @@ private void
addOrRemoveJarFromFilesystem(EntryEvent<String, Configuration> even
}
}
}
- }
- private boolean jarExistsInFilesystem(String groupName, String jarName) {
- return sharedConfig.getPathToJarOnThisLocator(groupName,
jarName).toFile().exists();
+ String triggerMemberId = (String) event.getCallbackArgument();
+ DistributedMember locator = getDistributedMember(triggerMemberId);
+ for (String jarAdded : newJars) {
+ try {
+ sharedConfig.downloadJarFromLocator(group, jarAdded, locator);
+ } catch (Exception e) {
+ logger.error("Unable to add jar: " + jarAdded, e);
+ }
+ }
}
+ private DistributedMember getDistributedMember(String memberName) {
+ InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
+ Set<DistributedMember> locators = new HashSet<>(
+
cache.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration().keySet());
+
+ Optional<DistributedMember> locator =
+ locators.stream().filter(x ->
x.getId().equals(memberName)).findFirst();
+ return locator.get();
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
index f4fbbb3982..56225ca094 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
@@ -15,6 +15,8 @@
*/
package org.apache.geode.management.internal.configuration.functions;
+import java.util.List;
+
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.execute.Function;
@@ -25,34 +27,31 @@
import org.apache.geode.internal.InternalEntity;
import org.apache.geode.internal.logging.LogService;
-public class UploadJarFunction implements Function, InternalEntity {
+public class UploadJarFunction implements Function<Object[]>, InternalEntity {
private static final Logger logger = LogService.getLogger();
private static final long serialVersionUID = 1L;
@Override
- public void execute(FunctionContext context) {
+ public void execute(FunctionContext<Object[]> context) {
InternalLocator locator = (InternalLocator) Locator.getLocator();
- Object[] args = (Object[]) context.getArguments();
+ Object[] args = context.getArguments();
String group = (String) args[0];
String jarName = (String) args[1];
+ byte[] jarBytes = null;
if (locator != null && group != null && jarName != null) {
ClusterConfigurationService sharedConfig =
locator.getSharedConfiguration();
if (sharedConfig != null) {
try {
- byte[] jarBytes = sharedConfig.getJarBytesFromThisLocator(group,
jarName);
+ jarBytes = sharedConfig.getJarBytesFromThisLocator(group, jarName);
context.getResultSender().lastResult(jarBytes);
-
} catch (Exception e) {
logger.error(e);
- context.getResultSender().sendException(e);
+ throw new IllegalStateException(e.getMessage());
}
}
}
-
- // TODO: Why does this not throw an IllegalStateException?
- context.getResultSender().lastResult(null);
}
@Override
diff --git
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
index 92e8c9d8bb..4ce732331d 100644
---
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.net.URL;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -118,6 +119,31 @@ public void redeployJarsWithNewVersionsOfFunctions()
throws Exception {
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION2));
}
+ @Test
+ public void redeployJarsWithNewVersionsOfFunctionsAndMultipleLocators()
throws Exception {
+ Properties props = new Properties();
+ props.setProperty("locators", "localhost[" + locator.getPort() + "]");
+ MemberVM locator2 = lsRule.startLocatorVM(2, props);
+
+ gfshConnector.executeAndAssertThat("deploy --jar=" +
jarAVersion1.getCanonicalPath())
+ .statusIsSuccess();
+ server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+ server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
+
+
+ gfshConnector.executeAndAssertThat("deploy --jar=" +
jarAVersion2.getCanonicalPath())
+ .statusIsSuccess();
+ server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+ server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+
+ server.stopMember(false);
+
+ lsRule.startServerVM(1, locator.getPort());
+
+ server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+ server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+ }
+
@Test
public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() throws
Exception {
gfshConnector.executeAndAssertThat("deploy --jar=" +
jarAVersion1.getCanonicalPath())
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Deployed jars may not be correct when multiple locators are in use
> ------------------------------------------------------------------
>
> Key: GEODE-4029
> URL: https://issues.apache.org/jira/browse/GEODE-4029
> Project: Geode
> Issue Type: Bug
> Components: gfsh
> Reporter: Jens Deppe
> Assignee: Jens Deppe
>
> When we have more than one locator, if a jar is re-deployed it will not be
> propagated correctly to the 'remote' locators. i.e. the locators which are
> not directly executing the 'deploy' command.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)