Repository: stratos Updated Branches: refs/heads/data-publisher-integration f704fa3da -> b530e9de3
Refactoring Monitoring Service Classes Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b530e9de Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b530e9de Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b530e9de Branch: refs/heads/data-publisher-integration Commit: b530e9de31e16075f8c3505d35e95c2ec4d50429 Parents: f704fa3 Author: Thanuja <[email protected]> Authored: Wed Aug 12 18:16:54 2015 +0530 Committer: Thanuja <[email protected]> Committed: Wed Aug 12 18:16:54 2015 +0530 ---------------------------------------------------------------------- .../publisher/HealthStatisticsPublisher.java | 4 +- .../publisher/InFlightRequestPublisher.java | 4 +- .../publisher/StatisticsPublisherType.java | 2 +- .../publisher/ThriftClientConfig.java | 81 +++++++++++ .../publisher/ThriftClientConfigParser.java | 139 +++++++++++++++++++ .../statistics/publisher/ThriftClientInfo.java | 63 +++++++++ .../publisher/ThriftStatisticsPublisher.java | 115 +++++++++++++++ .../publisher/wso2/cep/ThriftClientConfig.java | 81 ----------- .../wso2/cep/ThriftClientConfigParser.java | 139 ------------------- .../publisher/wso2/cep/ThriftClientInfo.java | 63 --------- .../cep/WSO2CEPHealthStatisticsPublisher.java | 26 ++-- .../cep/WSO2CEPInFlightRequestPublisher.java | 28 ++-- .../wso2/cep/WSO2CEPStatisticsPublisher.java | 114 --------------- .../test/ThriftClientConfigParserTest.java | 13 +- .../cartridge.agent/healthstats.py | 4 +- .../streamdefinitions/stream-manager-config.xml | 4 +- 16 files changed, 447 insertions(+), 433 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java index 95b04ff..6af1317 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java @@ -27,7 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { /** * Publish health statistics to complex event processor. * - * @param timeStamp time + * @param timestamp Time * @param clusterId Cluster id of the member * @param clusterInstanceId Cluster instance id of the member * @param networkPartitionId Network partition id of the member @@ -36,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { * @param health Health type: memory_consumption | load_average * @param value Health type value */ - void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value); } http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java index af9c8e9..e4e65c0 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java @@ -27,12 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher { /** * Publish in-flight request count. * - * @param timeStamp time + * @param timestamp Time * @param clusterId Cluster id * @param clusterInstanceId Cluster instance id * @param networkPartitionId Network partition id of the cluster * @param inFlightRequestCount In-flight request count of the cluster */ - void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount); } http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java index 77c5b78..d4b9a87 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java @@ -23,5 +23,5 @@ package org.apache.stratos.common.statistics.publisher; * Statistics publisher type enumneration. */ public enum StatisticsPublisherType { - WSO2CEP + WSO2CEP, WSO2DAS } http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java new file mode 100644 index 0000000..25ee897 --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.statistics.publisher; + + +import org.apache.commons.lang.StringUtils; + +/** + * Thrift Client configuration. + */ +public class ThriftClientConfig { + + public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path"; + + private static volatile ThriftClientConfig instance; + private ThriftClientInfo thriftClientInfo; + + /* + * A private Constructor prevents any other + * class from instantiating. + */ + ThriftClientConfig() { + } + + public static ThriftClientConfig getInstance() { + if (instance == null) { + synchronized (ThriftClientConfig.class) { + if (instance == null) { + String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH); + if (StringUtils.isBlank(configFilePath)) { + throw new RuntimeException(String.format("Thrift client configuration file path system " + + "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH)); + } + instance = ThriftClientConfigParser.parse(configFilePath); + } + } + } + return instance; + } + + /** + * Returns an ThriftClientInfo Object that stores the credential information. + * Thrift client credential information can be found under thrift-client-config.xml file + * These credential information then get parsed and assigned into ThriftClientInfo + * Object. + * <p/> + * This method is used to return the assigned values in ThriftClientInfo Object + * + * @return ThriftClientInfo object which consists of username,password,ip and port values + */ + public ThriftClientInfo getThriftClientInfo() { + return thriftClientInfo; + } + + /** + * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken + * from thrift-client-config.xml file. + * + * @param thriftClientInfo Object of the ThriftClientInfo + */ + public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) { + this.thriftClientInfo = thriftClientInfo; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java new file mode 100644 index 0000000..aa05d6f --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.statistics.publisher; + +import org.apache.axiom.om.OMElement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.util.AxiomXpathParserUtil; +import org.wso2.securevault.SecretResolver; +import org.wso2.securevault.SecretResolverFactory; + +import java.io.File; +import java.util.Iterator; + +/** + * Thrift client config parser. + */ +public class ThriftClientConfigParser { + + private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class); + + /** + * Fields to be read from the thrift-client-config.xml file + */ + private static final String USERNAME_ELEMENT = "username"; + private static final String PASSWORD_ELEMENT = "password"; + private static final String IP_ELEMENT = "ip"; + private static final String PORT_ELEMENT = "port"; + + /** + * This method reads thrift-client-config.xml file and assign necessary credential + * values into thriftClientInfo object. A singleton design has been implemented + * with the use of thriftClientIConfig class. + * <p/> + * The filePath argument is the path to thrift-client-config.xml file + * + * @param filePath the path to thrift-client-config.xml file + * @return ThriftClientConfig object + */ + public static ThriftClientConfig parse(String filePath) { + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Parsing thrift client config file: %s", filePath)); + } + + ThriftClientConfig thriftClientIConfig = new ThriftClientConfig(); + ThriftClientInfo thriftClientInfo = new ThriftClientInfo(); + thriftClientIConfig.setThriftClientInfo(thriftClientInfo); + + File configFile = new File(filePath); + if (!configFile.exists()) { + throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath)); + } + OMElement document = AxiomXpathParserUtil.parse(configFile); + Iterator thriftClientIterator = document.getChildElements(); + + //Initialize the SecretResolver providing the configuration element. + SecretResolver secretResolver = SecretResolverFactory.create(document, false); + + String userNameValuesStr = null; + String passwordValueStr = null; + String ipValuesStr = null; + String portValueStr = null; + + //same entry used in cipher-text.properties and cipher-tool.properties. + String secretAlias = "thrift.client.configuration.password"; + + // Iterate the thrift-client-config.xml file and read child element + // consists of credential information necessary for ThriftStatisticsPublisher + while (thriftClientIterator.hasNext()) { + OMElement thriftClientElement = (OMElement) thriftClientIterator.next(); + + if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { + userNameValuesStr = thriftClientElement.getText(); + thriftClientInfo.setUsername(userNameValuesStr); + } + //password field protected using Secure vault + if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { + if ((secretResolver != null) && (secretResolver.isInitialized())) { + if (secretResolver.isTokenProtected(secretAlias)) { + passwordValueStr = secretResolver.resolve(secretAlias); + } else { + passwordValueStr = thriftClientElement.getText(); + } + } else { + passwordValueStr = thriftClientElement.getText(); + } + thriftClientInfo.setPassword(passwordValueStr); + } + + if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { + ipValuesStr = thriftClientElement.getText(); + thriftClientInfo.setIp(ipValuesStr); + } + + if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { + portValueStr = thriftClientElement.getText(); + thriftClientInfo.setPort(portValueStr); + } + } + + if (userNameValuesStr == null) { + throw new RuntimeException("Username value not found in thrift client configuration"); + } + if (passwordValueStr == null) { + throw new RuntimeException("Password not found in thrift client configuration "); + } + + if (ipValuesStr == null) { + throw new RuntimeException("Ip values not found in thrift client configuration "); + } + + if (portValueStr == null) { + throw new RuntimeException("Port not found in thrift client configuration "); + } + + return thriftClientIConfig; + } catch (Exception e) { + throw new RuntimeException("Could not parse thrift client configuration", e); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java new file mode 100644 index 0000000..514d907 --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.statistics.publisher; + +/** + * Thrift Client Info + */ +public class ThriftClientInfo { + private String username; + private String password; + private String ip; + private String port; + + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public String getPort() { + return port; + } + + public void setPort(String port) { + this.port = port; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java new file mode 100644 index 0000000..6a9e955 --- /dev/null +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.common.statistics.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.databridge.agent.thrift.Agent; +import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; +import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; + +import java.util.HashMap; + +/** + * Thrift statistics publisher. + */ +public class ThriftStatisticsPublisher implements StatisticsPublisher { + + private static final Log log = LogFactory.getLog(ThriftStatisticsPublisher.class); + + private StreamDefinition streamDefinition; + private AsyncDataPublisher asyncDataPublisher; + private String ip; + private String port; + private String username; + private String password; + private boolean enabled = false; + + /** + * Credential information stored inside thrift-client-config.xml file + * is parsed and assigned into ip,port,username and password fields + * + * @param streamDefinition Thrift Event Stream Definition + */ + public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String statsPublisherEnabled) { + ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance(); + ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo(); + + this.streamDefinition = streamDefinition; + this.ip = thriftClientInfo.getIp(); + this.port = thriftClientInfo.getPort(); + this.username = thriftClientInfo.getUsername(); + this.password = thriftClientInfo.getPassword(); + + enabled = Boolean.getBoolean(statsPublisherEnabled); + if (enabled) { + init(); + } + } + + private void init() { + AgentConfiguration agentConfiguration = new AgentConfiguration(); + Agent agent = new Agent(agentConfiguration); + + // Initialize asynchronous data publisher + asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent); + asyncDataPublisher.addStreamDefinition(streamDefinition); + } + + @Override + public void setEnabled(boolean enabled) { + this.enabled = enabled; + if (this.enabled) { + init(); + } + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void publish(Object[] payload) { + if (!isEnabled()) { + throw new RuntimeException("Statistics publisher is not enabled"); + } + + Event event = new Event(); + event.setPayloadData(payload); + event.setArbitraryDataMap(new HashMap<String, String>()); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing thrift event: [stream] %s [version] %s", + streamDefinition.getName(), streamDefinition.getVersion())); + } + asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); + } catch (AgentException e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish thrift event: [stream] %s [version] %s", + streamDefinition.getName(), streamDefinition.getVersion()), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java deleted file mode 100644 index 178c5a6..0000000 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.common.statistics.publisher.wso2.cep; - - -import org.apache.commons.lang.StringUtils; - -/** - * Thrift Client configuration. - */ -public class ThriftClientConfig { - - public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path"; - - private static volatile ThriftClientConfig instance; - private ThriftClientInfo thriftClientInfo; - - /* - * A private Constructor prevents any other - * class from instantiating. - */ - ThriftClientConfig() { - } - - public static ThriftClientConfig getInstance() { - if (instance == null) { - synchronized (ThriftClientConfig.class) { - if (instance == null) { - String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH); - if (StringUtils.isBlank(configFilePath)) { - throw new RuntimeException(String.format("Thrift client configuration file path system " + - "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH)); - } - instance = ThriftClientConfigParser.parse(configFilePath); - } - } - } - return instance; - } - - /** - * Returns an ThriftClientInfo Object that stores the credential information. - * CEP credential information can be found under thrift-client-config.xml file - * These credential information then get parsed and assigned into ThriftClientInfo - * Object. - * <p/> - * This method is used to return the assigned values in ThriftClientInfo Object - * - * @return ThriftClientInfo object which consists of username,password,ip and port values - */ - public ThriftClientInfo getThriftClientInfo() { - return thriftClientInfo; - } - - /** - * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken - * from thrift-client-config.xml file. - * - * @param thriftClientInfo Object of the ThriftClientInfo - */ - public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) { - this.thriftClientInfo = thriftClientInfo; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java deleted file mode 100644 index ae3c692..0000000 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.common.statistics.publisher.wso2.cep; - -import org.apache.axiom.om.OMElement; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.util.AxiomXpathParserUtil; -import org.wso2.securevault.SecretResolver; -import org.wso2.securevault.SecretResolverFactory; - -import java.io.File; -import java.util.Iterator; - -/** - * Thrift client config parser. - */ -public class ThriftClientConfigParser { - - private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class); - - /** - * Fields to be read from the thrift-client-config.xml file - */ - private static final String USERNAME_ELEMENT = "username"; - private static final String PASSWORD_ELEMENT = "password"; - private static final String IP_ELEMENT = "ip"; - private static final String PORT_ELEMENT = "port"; - - /** - * This method reads thrift-client-config.xml file and assign necessary credential - * values into thriftClientInfo object. A singleton design has been implemented - * with the use of thriftClientIConfig class. - * <p/> - * The filePath argument is the path to thrift-client-config.xml file - * - * @param filePath the path to thrift-client-config.xml file - * @return ThriftClientConfig object - */ - public static ThriftClientConfig parse(String filePath) { - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Parsing thrift client config file: %s", filePath)); - } - - ThriftClientConfig thriftClientIConfig = new ThriftClientConfig(); - ThriftClientInfo thriftClientInfo = new ThriftClientInfo(); - thriftClientIConfig.setThriftClientInfo(thriftClientInfo); - - File configFile = new File(filePath); - if (!configFile.exists()) { - throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath)); - } - OMElement document = AxiomXpathParserUtil.parse(configFile); - Iterator thriftClientIterator = document.getChildElements(); - - //Initialize the SecretResolver providing the configuration element. - SecretResolver secretResolver = SecretResolverFactory.create(document, false); - - String userNameValuesStr = null; - String passwordValueStr = null; - String ipValuesStr = null; - String portValueStr = null; - - //same entry used in cipher-text.properties and cipher-tool.properties. - String secretAlias = "thrift.client.configuration.password"; - - // Iterate the thrift-client-config.xml file and read child element - // consists of credential information necessary for WSO2CEPStatisticsPublisher - while (thriftClientIterator.hasNext()) { - OMElement thriftClientElement = (OMElement) thriftClientIterator.next(); - - if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { - userNameValuesStr = thriftClientElement.getText(); - thriftClientInfo.setUsername(userNameValuesStr); - } - //password field protected using Secure vault - if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { - if ((secretResolver != null) && (secretResolver.isInitialized())) { - if (secretResolver.isTokenProtected(secretAlias)) { - passwordValueStr = secretResolver.resolve(secretAlias); - } else { - passwordValueStr = thriftClientElement.getText(); - } - } else { - passwordValueStr = thriftClientElement.getText(); - } - thriftClientInfo.setPassword(passwordValueStr); - } - - if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { - ipValuesStr = thriftClientElement.getText(); - thriftClientInfo.setIp(ipValuesStr); - } - - if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) { - portValueStr = thriftClientElement.getText(); - thriftClientInfo.setPort(portValueStr); - } - } - - if (userNameValuesStr == null) { - throw new RuntimeException("Username value not found in thrift client configuration"); - } - if (passwordValueStr == null) { - throw new RuntimeException("Password not found in thrift client configuration "); - } - - if (ipValuesStr == null) { - throw new RuntimeException("Ip values not found in thrift client configuration "); - } - - if (portValueStr == null) { - throw new RuntimeException("Port not found in thrift client configuration "); - } - - return thriftClientIConfig; - } catch (Exception e) { - throw new RuntimeException("Could not parse thrift client configuration", e); - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java deleted file mode 100644 index 1a9ba81..0000000 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.common.statistics.publisher.wso2.cep; - -/** - * Thrift Client Info - */ -public class ThriftClientInfo { - private String username; - private String password; - private String ip; - private String port; - - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getPort() { - return port; - } - - public void setPort(String port) { - this.port = port; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java index d5c9265..33cf0b5 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java @@ -22,6 +22,7 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher; +import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher; import org.wso2.carbon.databridge.commons.Attribute; import org.wso2.carbon.databridge.commons.AttributeType; import org.wso2.carbon.databridge.commons.StreamDefinition; @@ -32,15 +33,16 @@ import java.util.List; /** * Health statistics publisher for publishing statistics to WSO2 CEP. */ -public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher implements HealthStatisticsPublisher { +public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher { private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class); + private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled"; private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats"; private static final String VERSION = "1.0.0"; public WSO2CEPHealthStatisticsPublisher() { - super(createStreamDefinition()); + super(createStreamDefinition(), STATS_PUBLISHER_ENABLED); } private static StreamDefinition createStreamDefinition() { @@ -71,17 +73,17 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher /** * Publish health statistics to cep. * - * @param timeStamp - * @param clusterId - * @param clusterInstanceId - * @param networkPartitionId - * @param memberId - * @param partitionId - * @param health - * @param value + * @param timestamp Time + * @param clusterId Cluster id of the member + * @param clusterInstanceId Cluster instance id of the member + * @param networkPartitionId Network partition id of the member + * @param memberId Member id + * @param partitionId Partition id of the member + * @param health Health type: memory_consumption | load_average + * @param value Health type value */ @Override - public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) { if (log.isDebugEnabled()) { log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " + @@ -90,7 +92,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher } // Set payload values List<Object> payload = new ArrayList<Object>(); - payload.add(timeStamp); + payload.add(timestamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java index f51eb91..f853f23 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java @@ -19,7 +19,10 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher; +import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher; import org.wso2.carbon.databridge.commons.Attribute; import org.wso2.carbon.databridge.commons.AttributeType; import org.wso2.carbon.databridge.commons.StreamDefinition; @@ -33,13 +36,15 @@ import java.util.List; * In-flight request count: * Number of requests being served at a given moment could be identified as in-flight request count. */ -public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher implements InFlightRequestPublisher { +public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher { + private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class); + private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled"; private static final String DATA_STREAM_NAME = "in_flight_requests"; private static final String VERSION = "1.0.0"; public WSO2CEPInFlightRequestPublisher() { - super(createStreamDefinition()); + super(createStreamDefinition(), STATS_PUBLISHER_ENABLED); } private static StreamDefinition createStreamDefinition() { @@ -66,18 +71,23 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher /** * Publish in-flight request count of a cluster. * - * @param timeStamp - * @param clusterId - * @param clusterInstanceId - * @param networkPartitionId - * @param inFlightRequestCount + * @param timestamp Time + * @param clusterId Cluster id + * @param clusterInstanceId Cluster instance id + * @param networkPartitionId Network partition id of the cluster + * @param inFlightRequestCount In-flight request count of the cluster */ @Override - public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount) { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing health statistics: [timestamp] %d [cluster] %s " + + "[cluster-instance] %s [network-partition] %s [in-flight-request-count] %d", + timestamp, clusterId, clusterInstanceId, networkPartitionId, inFlightRequestCount)); + } // Set payload values List<Object> payload = new ArrayList<Object>(); - payload.add(timeStamp); + payload.add(timestamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java deleted file mode 100644 index 653288d..0000000 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.common.statistics.publisher.wso2.cep; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.statistics.publisher.StatisticsPublisher; -import org.wso2.carbon.databridge.agent.thrift.Agent; -import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; -import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; -import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; -import org.wso2.carbon.databridge.commons.Event; -import org.wso2.carbon.databridge.commons.StreamDefinition; - -import java.util.HashMap; - -/** - * WSO2 CEP statistics publisher. - */ -public class WSO2CEPStatisticsPublisher implements StatisticsPublisher { - - private static final Log log = LogFactory.getLog(WSO2CEPStatisticsPublisher.class); - - private StreamDefinition streamDefinition; - private AsyncDataPublisher asyncDataPublisher; - private String ip; - private String port; - private String username; - private String password; - private boolean enabled = false; - - /** - * Credential information stored inside thrift-client-config.xml file - * is parsed and assigned into ip,port,username and password fields - * - * @param streamDefinition - */ - public WSO2CEPStatisticsPublisher(StreamDefinition streamDefinition) { - ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance(); - thriftClientConfig.getThriftClientInfo(); - - this.streamDefinition = streamDefinition; - this.ip = thriftClientConfig.getThriftClientInfo().getIp(); - this.port = thriftClientConfig.getThriftClientInfo().getPort(); - this.username = thriftClientConfig.getThriftClientInfo().getUsername(); - this.password = thriftClientConfig.getThriftClientInfo().getPassword(); - - enabled = Boolean.getBoolean("cep.stats.publisher.enabled"); - if (enabled) { - init(); - } - } - - private void init() { - AgentConfiguration agentConfiguration = new AgentConfiguration(); - Agent agent = new Agent(agentConfiguration); - - // Initialize asynchronous data publisher - asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent); - asyncDataPublisher.addStreamDefinition(streamDefinition); - } - - @Override - public void setEnabled(boolean enabled) { - this.enabled = enabled; - if (this.enabled) { - init(); - } - } - - @Override - public boolean isEnabled() { - return enabled; - } - - @Override - public void publish(Object[] payload) { - if (!isEnabled()) { - throw new RuntimeException("Statistics publisher is not enabled"); - } - - Event event = new Event(); - event.setPayloadData(payload); - event.setArbitraryDataMap(new HashMap<String, String>()); - - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); - } - asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); - } catch (AgentException e) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java index 09d5b5e..352041d 100644 --- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java +++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java @@ -20,7 +20,8 @@ package org.apache.stratos.common.test; import junit.framework.TestCase; -import org.apache.stratos.common.statistics.publisher.wso2.cep.ThriftClientConfig; +import org.apache.stratos.common.statistics.publisher.ThriftClientConfig; +import org.apache.stratos.common.statistics.publisher.ThriftClientInfo; import org.junit.Test; import java.net.URL; @@ -41,11 +42,11 @@ public class ThriftClientConfigParserTest extends TestCase { URL configFileUrl = ThriftClientConfigParserTest.class.getResource("/thrift-client-config.xml"); System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, configFileUrl.getPath()); ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance(); - thriftClientConfig.getThriftClientInfo(); + ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo(); - assertEquals("Incorrect Password", "admin", thriftClientConfig.getThriftClientInfo().getUsername()); - assertEquals("Incorrect Password", "1234", thriftClientConfig.getThriftClientInfo().getPassword()); - assertEquals("Incorrect IP", "192.168.10.10", thriftClientConfig.getThriftClientInfo().getIp()); - assertEquals("Incorrect Port", "9300", thriftClientConfig.getThriftClientInfo().getPort()); + assertEquals("Incorrect Username", "admin", thriftClientInfo.getUsername()); + assertEquals("Incorrect Password", "1234", thriftClientInfo.getPassword()); + assertEquals("Incorrect IP", "192.168.10.10", thriftClientInfo.getIp()); + assertEquals("Incorrect Port", "9300", thriftClientInfo.getPort()); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index 0cd76af..0dbd1e1 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -139,7 +139,7 @@ class HealthStatisticsPublisher: stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION # stream_def.add_payloaddata_attribute() - stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG) + stream_def.add_payloaddata_attribute("timestamp", StreamDefinition.LONG) stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) @@ -226,7 +226,7 @@ class DefaultHealthStatisticsReader: (one, five, fifteen) = os.getloadavg() cores = multiprocessing.cpu_count() - return (one/cores) * 100 + return (one / cores) * 100 class CartridgeHealthStatistics: http://git-wip-us.apache.org/repos/asf/stratos/blob/b530e9de/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml index a256770..9e0a833 100644 --- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml +++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml @@ -30,7 +30,7 @@ <correlationData> </correlationData> <payloadData> - <property name="time_stamp" type="long"/> + <property name="timestamp" type="long"/> <property name="cluster_id" type="String"/> <property name="cluster_instance_id" type="String"/> <property name="network_partition_id" type="String"/> @@ -93,7 +93,7 @@ <correlationData> </correlationData> <payloadData> - <property name="time_stamp" type="long"/> + <property name="timestamp" type="long"/> <property name="cluster_id" type="String"/> <property name="cluster_instance_id" type="String"/> <property name="network_partition_id" type="String"/>
