This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7972b77  GEODE-4029: Deployed jars may not be correct when multiple 
locators a… (#1103)
7972b77 is described below

commit 7972b77d7790c63ee3f4d0a378a0efec38eaf372
Author: Jens Deppe <jde...@pivotal.io>
AuthorDate: Mon Dec 4 11:13:27 2017 -0800

    GEODE-4029: Deployed jars may not be correct when multiple locators a… 
(#1103)
    
    * GEODE-4029: Deployed jars may not be correct when multiple locators are 
in use
    
    * GEODE-4029: Review updates
---
 .../internal/ClusterConfigurationService.java      | 82 ++++++++++++++++------
 .../callbacks/ConfigurationChangeListener.java     | 68 ++++++++++--------
 .../configuration/functions/UploadJarFunction.java | 17 +++--
 .../commands/DeployCommandRedeployDUnitTest.java   | 26 +++++++
 4 files changed, 134 insertions(+), 59 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
index 16b86aa..beb6ae5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterConfigurationService.java
@@ -27,6 +27,7 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.file.Path;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
@@ -35,7 +36,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -319,9 +319,12 @@ public class ClusterConfigurationService {
         // will need the jars on file to upload to other locators. Need to 
update the jars
         // using a new copy of the Configuration so that the change listener 
will pick up the jar
         // name changes.
+
+        String memberId = cache.getMyId().getId();
+
         Configuration configurationCopy = new Configuration(configuration);
         configurationCopy.addJarNames(jarNames);
-        configRegion.put(group, configurationCopy);
+        configRegion.put(group, configurationCopy, memberId);
       }
     } catch (Exception e) {
       success = false;
@@ -352,6 +355,20 @@ public class ClusterConfigurationService {
         if (configuration == null) {
           break;
         }
+
+        for (String jarRemoved : jarNames) {
+          File jar = this.getPathToJarOnThisLocator(group, 
jarRemoved).toFile();
+          if (jar.exists()) {
+            try {
+              FileUtils.forceDelete(jar);
+            } catch (IOException e) {
+              logger.error(
+                  "Exception occurred while attempting to delete a jar from 
the filesystem: {}",
+                  jarRemoved, e);
+            }
+          }
+        }
+
         Configuration configurationCopy = new Configuration(configuration);
         configurationCopy.removeJarNames(jarNames);
         configRegion.put(group, configurationCopy);
@@ -382,27 +399,58 @@ public class ClusterConfigurationService {
     return FileUtils.readFileToByteArray(jar);
   }
 
-  // used in the cluster config change listener when jarnames are changed in 
the internal region
+  // Only used when a locator is initially starting up
   public void downloadJarFromOtherLocators(String groupName, String jarName)
       throws IllegalStateException, IOException {
     logger.info("Getting Jar files from other locators");
     DM dm = this.cache.getDistributionManager();
     DistributedMember me = this.cache.getMyId();
-    Set<DistributedMember> locators =
-        new 
HashSet<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
+    List<DistributedMember> locators =
+        new 
ArrayList<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
     locators.remove(me);
 
     createConfigDirIfNecessary(groupName);
 
-    byte[] jarBytes = locators.stream()
-        .map((DistributedMember locator) -> downloadJarFromLocator(locator, 
groupName, jarName))
-        .filter(Objects::nonNull).findFirst().orElseThrow(() -> new 
IllegalStateException(
-            "No locators have a deployed jar named " + jarName + " in " + 
groupName));
+    if (locators.isEmpty()) {
+      throw new IllegalStateException(
+          "Request to download jar " + jarName + " but no other locators are 
present");
+    }
+
+    byte[] jarBytes = downloadJar(locators.get(0), groupName, jarName);
 
     File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
     FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);
   }
 
+  // used in the cluster config change listener when jarnames are changed in 
the internal region
+  public void downloadJarFromLocator(String groupName, String jarName,
+      DistributedMember sourceLocator) throws IllegalStateException, 
IOException {
+    logger.info("Downloading jar {} from locator {}", jarName, 
sourceLocator.getName());
+
+    createConfigDirIfNecessary(groupName);
+
+    byte[] jarBytes = downloadJar(sourceLocator, groupName, jarName);
+
+    if (jarBytes == null) {
+      throw new IllegalStateException("Could not download jar " + jarName + " 
in " + groupName
+          + " from " + sourceLocator.getName());
+    }
+
+    File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
+    FileUtils.writeByteArrayToFile(jarToWrite, jarBytes);
+  }
+
+  private byte[] downloadJar(DistributedMember locator, String groupName, 
String jarName) {
+    ResultCollector<byte[], List<byte[]>> rc =
+        (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new 
UploadJarFunction(),
+            new Object[] {groupName, jarName}, Collections.singleton(locator));
+
+    List<byte[]> result = rc.getResult();
+
+    // we should only get one byte[] back in the list
+    return result.get(0);
+  }
+
   // used when creating cluster config response
   public Map<String, byte[]> getAllJarsFromThisLocator(Set<String> groups) 
throws IOException {
     Map<String, byte[]> jarNamesToJarBytes = new HashMap<>();
@@ -609,7 +657,9 @@ public class ClusterConfigurationService {
       }
       Region<String, Configuration> clusterRegion = getConfigurationRegion();
       clusterRegion.clear();
-      clusterRegion.putAll(sharedConfiguration);
+
+      String memberId = cache.getMyId().getId();
+      clusterRegion.putAll(sharedConfiguration, memberId);
 
       // Overwrite the security settings using the locator's properties, 
ignoring whatever
       // in the import
@@ -656,18 +706,6 @@ public class ClusterConfigurationService {
     this.sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
   }
 
-  private byte[] downloadJarFromLocator(DistributedMember locator, String 
groupName,
-      String jarName) {
-    ResultCollector<byte[], List<byte[]>> rc =
-        (ResultCollector<byte[], List<byte[]>>) CliUtil.executeFunction(new 
UploadJarFunction(),
-            new Object[] {groupName, jarName}, Collections.singleton(locator));
-
-    List<byte[]> result = rc.getResult();
-
-    // we should only get one byte[] back in the list
-    return result.stream().filter(Objects::nonNull).findFirst().orElse(null);
-  }
-
   /**
    * Gets the region containing the shared configuration data. The region is 
created , if it does
    * not exist already. Note : this could block if this locator contains stale 
persistent
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
index c68664e..1c09334 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/callbacks/ConfigurationChangeListener.java
@@ -17,14 +17,22 @@ package 
org.apache.geode.management.internal.configuration.callbacks;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterConfigurationService;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.internal.configuration.domain.Configuration;
 
@@ -41,48 +49,37 @@ public class ConfigurationChangeListener extends 
CacheListenerAdapter<String, Co
     this.sharedConfig = sharedConfig;
   }
 
+  // Don't process the event locally. The action of adding or removing a jar 
should already have
+  // been performed by DeployCommand or UndeployCommand.
   @Override
   public void afterUpdate(EntryEvent<String, Configuration> event) {
     super.afterUpdate(event);
-    addOrRemoveJarFromFilesystem(event);
+    if (event.isOriginRemote()) {
+      addOrRemoveJarFromFilesystem(event);
+    }
   }
 
   @Override
   public void afterCreate(EntryEvent<String, Configuration> event) {
     super.afterCreate(event);
-    addOrRemoveJarFromFilesystem(event);
+    if (event.isOriginRemote()) {
+      addOrRemoveJarFromFilesystem(event);
+    }
   }
 
-  // when a new jar is added, if it does not exist in the current locator, 
download it from
-  // another locator.
-  // when a jar is removed, if it exists in the current locator, remove it.
+  // Here we first remove any jars which are not used anymore and then we 
re-add all of the
+  // necessary jars again. This may appear a bit blunt but it also accounts 
for the situation
+  // where a jar is only being updated - i.e. the name does not change, only 
the content.
   private void addOrRemoveJarFromFilesystem(EntryEvent<String, Configuration> 
event) {
     String group = event.getKey();
-    Configuration newConfig = (Configuration) event.getNewValue();
-    Configuration oldConfig = (Configuration) event.getOldValue();
+    Configuration newConfig = event.getNewValue();
+    Configuration oldConfig = event.getOldValue();
     Set<String> newJars = newConfig.getJarNames();
     Set<String> oldJars = (oldConfig == null) ? new HashSet<>() : 
oldConfig.getJarNames();
-    Set<String> jarsAdded = new HashSet<>(newJars);
-    Set<String> jarsRemoved = new HashSet<>(oldJars);
 
-    jarsAdded.removeAll(oldJars);
+    Set<String> jarsRemoved = new HashSet<>(oldJars);
     jarsRemoved.removeAll(newJars);
 
-    if (!jarsAdded.isEmpty() && !jarsRemoved.isEmpty()) {
-      throw new IllegalStateException(
-          "We don't expect to have jars both added and removed in one event");
-    }
-
-    for (String jarAdded : jarsAdded) {
-      if (!jarExistsInFilesystem(group, jarAdded)) {
-        try {
-          sharedConfig.downloadJarFromOtherLocators(group, jarAdded);
-        } catch (Exception e) {
-          logger.error("Unable to add jar: " + jarAdded, e);
-        }
-      }
-    }
-
     for (String jarRemoved : jarsRemoved) {
       File jar = sharedConfig.getPathToJarOnThisLocator(group, 
jarRemoved).toFile();
       if (jar.exists()) {
@@ -95,10 +92,25 @@ public class ConfigurationChangeListener extends 
CacheListenerAdapter<String, Co
         }
       }
     }
-  }
 
-  private boolean jarExistsInFilesystem(String groupName, String jarName) {
-    return sharedConfig.getPathToJarOnThisLocator(groupName, 
jarName).toFile().exists();
+    String triggerMemberId = (String) event.getCallbackArgument();
+    DistributedMember locator = getDistributedMember(triggerMemberId);
+    for (String jarAdded : newJars) {
+      try {
+        sharedConfig.downloadJarFromLocator(group, jarAdded, locator);
+      } catch (Exception e) {
+        logger.error("Unable to add jar: " + jarAdded, e);
+      }
+    }
   }
 
+  private DistributedMember getDistributedMember(String memberName) {
+    InternalCache cache = (InternalCache) CacheFactory.getAnyInstance();
+    Set<DistributedMember> locators = new HashSet<>(
+        
cache.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration().keySet());
+
+    Optional<DistributedMember> locator =
+        locators.stream().filter(x -> 
x.getId().equals(memberName)).findFirst();
+    return locator.get();
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
index f4fbbb3..56225ca 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/functions/UploadJarFunction.java
@@ -15,6 +15,8 @@
  */
 package org.apache.geode.management.internal.configuration.functions;
 
+import java.util.List;
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.cache.execute.Function;
@@ -25,34 +27,31 @@ import 
org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.InternalEntity;
 import org.apache.geode.internal.logging.LogService;
 
-public class UploadJarFunction implements Function, InternalEntity {
+public class UploadJarFunction implements Function<Object[]>, InternalEntity {
   private static final Logger logger = LogService.getLogger();
 
   private static final long serialVersionUID = 1L;
 
   @Override
-  public void execute(FunctionContext context) {
+  public void execute(FunctionContext<Object[]> context) {
     InternalLocator locator = (InternalLocator) Locator.getLocator();
-    Object[] args = (Object[]) context.getArguments();
+    Object[] args = context.getArguments();
     String group = (String) args[0];
     String jarName = (String) args[1];
 
+    byte[] jarBytes = null;
     if (locator != null && group != null && jarName != null) {
       ClusterConfigurationService sharedConfig = 
locator.getSharedConfiguration();
       if (sharedConfig != null) {
         try {
-          byte[] jarBytes = sharedConfig.getJarBytesFromThisLocator(group, 
jarName);
+          jarBytes = sharedConfig.getJarBytesFromThisLocator(group, jarName);
           context.getResultSender().lastResult(jarBytes);
-
         } catch (Exception e) {
           logger.error(e);
-          context.getResultSender().sendException(e);
+          throw new IllegalStateException(e.getMessage());
         }
       }
     }
-
-    // TODO: Why does this not throw an IllegalStateException?
-    context.getResultSender().lastResult(null);
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
index 92e8c9d..4ce7323 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DeployCommandRedeployDUnitTest.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -119,6 +120,31 @@ public class DeployCommandRedeployDUnitTest {
   }
 
   @Test
+  public void redeployJarsWithNewVersionsOfFunctionsAndMultipleLocators() 
throws Exception {
+    Properties props = new Properties();
+    props.setProperty("locators", "localhost[" + locator.getPort() + "]");
+    MemberVM locator2 = lsRule.startLocatorVM(2, props);
+
+    gfshConnector.executeAndAssertThat("deploy --jar=" + 
jarAVersion1.getCanonicalPath())
+        .statusIsSuccess();
+    server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+    server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
+
+
+    gfshConnector.executeAndAssertThat("deploy --jar=" + 
jarAVersion2.getCanonicalPath())
+        .statusIsSuccess();
+    server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+    server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+
+    server.stopMember(false);
+
+    lsRule.startServerVM(1, locator.getPort());
+
+    server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
+    server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
+  }
+
+  @Test
   public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() throws 
Exception {
     gfshConnector.executeAndAssertThat("deploy --jar=" + 
jarAVersion1.getCanonicalPath())
         .statusIsSuccess();

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Reply via email to