This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 8c4db7a9c KNOX-2942 - Miscellaneous HXR parser improvements (#779)
8c4db7a9c is described below
commit 8c4db7a9c46dbb9740141240c5ad943defdddc72
Author: Sandor Molnar <[email protected]>
AuthorDate: Mon Jul 24 10:41:33 2023 +0200
KNOX-2942 - Miscellaneous HXR parser improvements (#779)
---
.../org/apache/knox/gateway/GatewayServer.java | 6 ++--
.../hadoop/xml/HadoopXmlResourceMessages.java | 12 +++++--
.../hadoop/xml/HadoopXmlResourceMonitor.java | 37 ++++++++++++++--------
.../hadoop/xml/HadoopXmlResourceParser.java | 24 ++++----------
.../hadoop/xml/HadoopXmlResourceParserTest.java | 8 -----
5 files changed, 42 insertions(+), 45 deletions(-)
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
index 65fefd50d..8485ec86d 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/GatewayServer.java
@@ -625,10 +625,10 @@ public class GatewayServer {
File topologiesDir = calculateAbsoluteTopologiesDir();
monitor = services.getService(ServiceType.TOPOLOGY_SERVICE);
- // Descriptors should be reloaded before topology reloading at startup, so
that any changes to descriptors
+ // Shared providers and descriptors should be reloaded before topology
reloading at startup, so that any changes to descriptors
// will be realized before Knox deploys "old" topologies that would have
re-deployed anyway in a matter of seconds
// by the descriptor monitor
- monitor.reloadDescriptors();
+ handleHadoopXmlResources();
monitor.addTopologyChangeListener(listener);
log.loadingTopologiesFromDirectory(topologiesDir.getAbsolutePath());
@@ -700,8 +700,6 @@ public class GatewayServer {
cleanupTopologyDeployments();
- handleHadoopXmlResources();
-
// Start the topology monitor.
monitor.startMonitor();
diff --git
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMessages.java
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMessages.java
index 6390e7f5b..63d92dfcb 100644
---
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMessages.java
+++
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMessages.java
@@ -16,6 +16,8 @@
*/
package org.apache.knox.gateway.topology.hadoop.xml;
+import java.nio.file.attribute.FileTime;
+
import org.apache.knox.gateway.i18n.messages.Message;
import org.apache.knox.gateway.i18n.messages.MessageLevel;
import org.apache.knox.gateway.i18n.messages.Messages;
@@ -30,8 +32,8 @@ public interface HadoopXmlResourceMessages {
@Message(level = MessageLevel.INFO, text = "Monitoring Knox resources in
Hadoop style XML configurations is disabled.")
void disableMonitoringHadoopXmlResources();
- @Message(level = MessageLevel.INFO, text = "Parsing Knox resources in
Hadoop style XML {0}. Looking up {1}...")
- void parseHadoopXmlResource(String path, String topologyName);
+ @Message(level = MessageLevel.INFO, text = "Parsing Knox resources in
Hadoop style XML {0}...")
+ void parseHadoopXmlResource(String path);
@Message(level = MessageLevel.INFO, text = "Found Knox descriptors {0} in
{1}")
void foundKnoxDescriptors(String descriptorList, String path);
@@ -54,6 +56,12 @@ public interface HadoopXmlResourceMessages {
@Message(level = MessageLevel.ERROR, text = "Parsing XML configuration {0}
failed: {1}")
void failedToParseXmlConfiguration(String path, String errorMessage,
@StackTrace(level = MessageLevel.DEBUG) Exception e);
+ @Message(level = MessageLevel.DEBUG, text = "Processing Hadoop XML resource
{0} (force = {1}; lastReloadTime = {2}; lastModified = {3})")
+ void processHadoopXmlResource(String descriptorPath, boolean force, FileTime
lastReloadTime, FileTime lastModifiedTime);
+
+ @Message(level = MessageLevel.DEBUG, text = "Skipping Hadoop XML resource
monitoring of {0} (force = {1}; lastReloadTime = {2}; lastModified = {3})")
+ void skipMonitorHadoopXmlResource(String descriptorPath, boolean force,
FileTime lastReloadTime, FileTime lastModifiedTime);
+
@Message(level = MessageLevel.ERROR, text = "Error while monitoring Hadoop
style XML configuration {0}: {1}")
void failedToMonitorHadoopXmlResource(String descriptorPath, String
errorMessage, @StackTrace(level = MessageLevel.DEBUG) Exception e);
diff --git
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMonitor.java
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMonitor.java
index 674724fe0..79c494e15 100644
---
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMonitor.java
+++
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceMonitor.java
@@ -24,11 +24,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.SuffixFileFilter;
@@ -52,50 +56,57 @@ public class HadoopXmlResourceMonitor implements
AdvancedServiceDiscoveryConfigC
private final String descriptorsDir;
private final long monitoringInterval;
private final HadoopXmlResourceParser hadoopXmlResourceParser;
- private FileTime lastReloadTime;
+ private final Map<Path, FileTime> lastReloadTimes;
+ private final Lock monitorLock = new ReentrantLock();
public HadoopXmlResourceMonitor(GatewayConfig gatewayConfig,
HadoopXmlResourceParser hadoopXmlResourceParser) {
this.hadoopXmlResourceParser = hadoopXmlResourceParser;
this.sharedProvidersDir = gatewayConfig.getGatewayProvidersConfigDir();
this.descriptorsDir = gatewayConfig.getGatewayDescriptorsDir();
this.monitoringInterval =
gatewayConfig.getClouderaManagerDescriptorsMonitoringInterval();
+ this.lastReloadTimes = new ConcurrentHashMap<>();
}
public void setupMonitor() {
if (monitoringInterval > 0) {
final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(new
BasicThreadFactory.Builder().namingPattern("ClouderaManagerDescriptorMonitor-%d").build());
- executorService.scheduleAtFixedRate(() ->
monitorClouderaManagerDescriptors(null), 0, monitoringInterval,
TimeUnit.MILLISECONDS);
+ executorService.scheduleAtFixedRate(() ->
monitorClouderaManagerDescriptors(false), 0, monitoringInterval,
TimeUnit.MILLISECONDS);
LOG.monitoringHadoopXmlResources(descriptorsDir);
- } else {
- LOG.disableMonitoringHadoopXmlResources();
}
}
- private void monitorClouderaManagerDescriptors(String topologyName) {
+ private void monitorClouderaManagerDescriptors(boolean force) {
final File[] clouderaManagerDescriptorFiles = new
File(descriptorsDir).listFiles((FileFilter) new
SuffixFileFilter(HADOOP_XML_RESOURCE_FILE_EXTENSION));
for (File clouderaManagerDescriptorFile : clouderaManagerDescriptorFiles) {
-
monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()),
topologyName);
+
monitorClouderaManagerDescriptor(Paths.get(clouderaManagerDescriptorFile.getAbsolutePath()),
force);
}
}
- private void monitorClouderaManagerDescriptor(Path
clouderaManagerDescriptorFile, String topologyName) {
+ private void monitorClouderaManagerDescriptor(Path
clouderaManagerDescriptorFile, boolean force) {
+ monitorLock.lock();
try {
if (Files.isReadable(clouderaManagerDescriptorFile)) {
final FileTime lastModifiedTime =
Files.getLastModifiedTime(clouderaManagerDescriptorFile);
- if (topologyName != null || lastReloadTime == null ||
lastReloadTime.compareTo(lastModifiedTime) < 0) {
- lastReloadTime = lastModifiedTime;
-
processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString(),
topologyName);
+ FileTime lastReloadTime =
lastReloadTimes.get(clouderaManagerDescriptorFile);
+ if (force || lastReloadTime == null ||
lastReloadTime.compareTo(lastModifiedTime) < 0) {
+ lastReloadTimes.put(clouderaManagerDescriptorFile, lastModifiedTime);
+
LOG.processHadoopXmlResource(clouderaManagerDescriptorFile.toString(), force,
lastReloadTime, lastModifiedTime);
+
processClouderaManagerDescriptor(clouderaManagerDescriptorFile.toString());
+ } else {
+
LOG.skipMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(),
force, lastReloadTime, lastModifiedTime);
}
} else {
LOG.failedToMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(),
"File is not readable!", null);
}
} catch (IOException e) {
LOG.failedToMonitorHadoopXmlResource(clouderaManagerDescriptorFile.toString(),
e.getMessage(), e);
+ } finally {
+ monitorLock.unlock();
}
}
- private void processClouderaManagerDescriptor(String descriptorFilePath,
String topologyName) {
- final HadoopXmlResourceParserResult result =
hadoopXmlResourceParser.parse(descriptorFilePath, topologyName);
+ private void processClouderaManagerDescriptor(String descriptorFilePath) {
+ final HadoopXmlResourceParserResult result =
hadoopXmlResourceParser.parse(descriptorFilePath);
processSharedProviders(result);
processDescriptors(result);
processDeleted(descriptorsDir, result.getDeletedDescriptors(), ".json");
@@ -160,6 +171,6 @@ public class HadoopXmlResourceMonitor implements
AdvancedServiceDiscoveryConfigC
if (StringUtils.isBlank(topologyName)) {
throw new IllegalArgumentException("Invalid advanced service discovery
configuration: topology name is missing!");
}
- monitorClouderaManagerDescriptors(topologyName);
+ monitorClouderaManagerDescriptors(true);
}
}
diff --git
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParser.java
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParser.java
index d00d358fc..4e1dfce60 100644
---
a/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParser.java
+++
b/gateway-topology-hadoop-xml/src/main/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParser.java
@@ -82,33 +82,21 @@ public class HadoopXmlResourceParser implements
AdvancedServiceDiscoveryConfigCh
this.sharedProvidersDir = gatewayConfig.getGatewayProvidersConfigDir();
}
- /**
- * Produces a set of {@link SimpleDescriptor}s from the specified file.
Parses ALL descriptors listed in the given file.
- *
- * @param path
- * The path to the configuration file which holds descriptor
information in a pre-defined format.
- * @return A SimpleDescriptor based on the contents of the given file.
- */
- public HadoopXmlResourceParserResult parse(String path) {
- return parse(path, null);
- }
-
/**
* Produces a set of {@link SimpleDescriptor}s from the specified file.
*
* @param path
* The path to the configuration file which holds descriptor
information in a pre-defined format.
- * @param topologyName
* if set, the parser should only parse a descriptor with the same
name
* @return A SimpleDescriptor based on the contents of the given file.
*/
- public HadoopXmlResourceParserResult parse(String path, String topologyName)
{
+ public HadoopXmlResourceParserResult parse(String path) {
try {
- log.parseHadoopXmlResource(path, topologyName == null ? "all topologies"
: topologyName);
+ log.parseHadoopXmlResource(path);
final Configuration xmlConfiguration = new Configuration(false);
xmlConfiguration.addResource(Paths.get(path).toUri().toURL());
xmlConfiguration.reloadConfiguration();
- final HadoopXmlResourceParserResult parserResult =
parseXmlConfig(xmlConfiguration, topologyName);
+ final HadoopXmlResourceParserResult parserResult =
parseXmlConfig(xmlConfiguration);
logParserResult(path, parserResult);
return parserResult;
} catch (Exception e) {
@@ -132,7 +120,7 @@ public class HadoopXmlResourceParser implements
AdvancedServiceDiscoveryConfigCh
}
}
- private HadoopXmlResourceParserResult parseXmlConfig(Configuration
xmlConfiguration, String topologyName) {
+ private HadoopXmlResourceParserResult parseXmlConfig(Configuration
xmlConfiguration) {
final Map<String, ProviderConfiguration> providers = new LinkedHashMap<>();
final Set<SimpleDescriptor> descriptors = new LinkedHashSet<>();
Set<String> deletedDescriptors = new HashSet<>();
@@ -143,8 +131,8 @@ public class HadoopXmlResourceParser implements
AdvancedServiceDiscoveryConfigCh
final String[] providerConfigurations =
xmlConfigurationKey.replace(CONFIG_NAME_PROVIDER_CONFIGS_PREFIX, "").split(",");
Arrays.stream(providerConfigurations).map(String::trim).forEach(providerConfigurationName
->
parseProvider(providerConfigurationName,
xmlDescriptor.getValue(), providers, deletedProviders));
- } else if (topologyName == null ||
xmlConfigurationKey.equals(topologyName)) {
- parseDescriptor(xmlConfigurationKey, xmlDescriptor.getValue(),
descriptors, deletedDescriptors);
+ } else {
+ parseDescriptor(xmlConfigurationKey, xmlDescriptor.getValue(),
descriptors, deletedDescriptors);
}
});
return new HadoopXmlResourceParserResult(providers, descriptors,
deletedDescriptors, deletedProviders);
diff --git
a/gateway-topology-hadoop-xml/src/test/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParserTest.java
b/gateway-topology-hadoop-xml/src/test/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParserTest.java
index 1ee49cbcd..bca77c815 100644
---
a/gateway-topology-hadoop-xml/src/test/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParserTest.java
+++
b/gateway-topology-hadoop-xml/src/test/java/org/apache/knox/gateway/topology/hadoop/xml/HadoopXmlResourceParserTest.java
@@ -111,14 +111,6 @@ public class HadoopXmlResourceParserTest {
assertNotNull(parserResult.getProviders().get("admin"));
}
- @Test
- public void testCMDescriptorParserOnlyTopology2() throws Exception {
- final String testConfigPath =
this.getClass().getClassLoader().getResource("testDescriptor.xml").getPath();
- final Set<SimpleDescriptor> descriptors =
hadoopXmlResourceParser.parse(testConfigPath, "topology2").getDescriptors();
- assertEquals(1, descriptors.size());
- validateTopology2Descriptors(descriptors.iterator().next(), true);
- }
-
@Test
public void testCMDescriptorParserWrongDescriptorContent() throws Exception {
final String testConfigPath =
this.getClass().getClassLoader().getResource("testDescriptorConfigurationWithWrongDescriptor.xml").getPath();