http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java deleted file mode 100644 index 8038d42..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.app.messaging; - -import backtype.storm.spout.Scheme; -import com.typesafe.config.Config; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, KafkaStreamSinkConfig,KafkaStreamSource,KafkaStreamSourceConfig> { - private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class); - private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic"; - private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic"; - private static final String DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY = "dataSourceConfig.schemeCls"; - - private String getSinkTopicName(String streamId, Config config) { - String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId); - if (config.hasPath(streamSpecificTopicConfigKey)) { - return config.getString(streamSpecificTopicConfigKey); - } else if (config.hasPath(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)) { - LOG.warn("Using default shared sink topic {}: {}", DEFAULT_SHARED_SINK_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY)); - return config.getString(DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); - } else { - LOG.error("Neither stream specific topic: {} nor default shared topic: {} found in config", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); - throw new IllegalArgumentException("Neither stream specific topic: " - + streamSpecificTopicConfigKey + " nor default shared topic: " + DEFAULT_SHARED_SINK_TOPIC_CONF_KEY + " found in config"); - } - } - - private String getSourceTopicName(String streamId, Config config) { - String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId); - if (config.hasPath(streamSpecificTopicConfigKey)) { - return config.getString(streamSpecificTopicConfigKey); - } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) { - LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY, config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)); - return config.getString(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY); - } else { - LOG.debug("Neither stream specific topic: {} nor default shared topic: {} found in config, try sink config instead", streamSpecificTopicConfigKey, DEFAULT_SHARED_SINK_TOPIC_CONF_KEY); - return getSinkTopicName(streamId,config); - } - } - - private String getSourceSchemeCls(String streamId, Config config) { - String streamSpecificSchemeClsKey = String.format("dataSourceConfig.%s.schemeCls", streamId); - if (config.hasPath(streamSpecificSchemeClsKey) ) { - return config.getString(streamSpecificSchemeClsKey); - } else if (config.hasPath(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)) { - LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY, config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)); - return config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY); - } - return null; - } - - @Override - public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) { - KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig(); - sinkConfig.setTopicId(getSinkTopicName(streamId,config)); - sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList")); - sinkConfig.setSerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.serializerClass") - ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder"); - sinkConfig.setKeySerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.keySerializerClass") - ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder"); - - // new added properties for async producer - sinkConfig.setNumBatchMessages(hasNonBlankConfigPath(config, "dataSinkConfig.numBatchMessages") - ? config.getString("dataSinkConfig.numBatchMessages") : "1024"); - sinkConfig.setProducerType(hasNonBlankConfigPath(config, "dataSinkConfig.producerType") - ? config.getString("dataSinkConfig.producerType") : "async"); - sinkConfig.setMaxQueueBufferMs(hasNonBlankConfigPath(config, "dataSinkConfig.maxQueueBufferMs") - ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000"); - sinkConfig.setRequestRequiredAcks(hasNonBlankConfigPath(config, "dataSinkConfig.requestRequiredAcks") - ? config.getString("dataSinkConfig.requestRequiredAcks") : "1"); - - return sinkConfig; - } - - @Override - public KafkaStreamSink getSink() { - return new KafkaStreamSink(); - } - - private boolean hasNonBlankConfigPath(Config config, String configName) { - return config.hasPath(configName) && StringUtils.isNotBlank(config.getString(configName)); - } - - @Override - public KafkaStreamSourceConfig getSourceConfig(String streamId, Config config) { - KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig(); - - sourceConfig.setTopicId(getSourceTopicName(streamId,config)); - sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection")); - - if (hasNonBlankConfigPath(config, "dataSourceConfig.fetchSize")) { - sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionZKRoot")) { - sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.consumerGroupId")) { - sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.brokerZkPath")) { - sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.txZkServers")) { - sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionStateUpdateMS")) { - sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.startOffsetTime")) { - sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime")); - } - if (hasNonBlankConfigPath(config, "dataSourceConfig.forceFromStart")) { - sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart")); - } - String schemeCls = getSourceSchemeCls(streamId, config); - if (schemeCls != null && StringUtils.isNotBlank(schemeCls)) { - try { - sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(schemeCls)); - } catch (ClassNotFoundException e) { - LOG.error("Class not found error, dataSourceConfig.schemeCls = {}", schemeCls, e); - } - } - return sourceConfig; - } - - @Override - public KafkaStreamSource getSource() { - return new KafkaStreamSource(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java deleted file mode 100644 index 696d79f..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSink.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import com.fasterxml.jackson.databind.ObjectMapper; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Properties; - -public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> { - private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class); - private String topicId; - private Producer producer; - private KafkaStreamSinkConfig config; - - @Override - public void init(String streamId, KafkaStreamSinkConfig config) { - super.init(streamId, config); - this.topicId = config.getTopicId(); - this.config = config; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - super.prepare(stormConf, context, collector); - Properties properties = new Properties(); - properties.put("metadata.broker.list", config.getBrokerList()); - properties.put("serializer.class", config.getSerializerClass()); - properties.put("key.serializer.class", config.getKeySerializerClass()); - // new added properties for async producer - properties.put("producer.type", config.getProducerType()); - properties.put("batch.num.messages", config.getNumBatchMessages()); - properties.put("request.required.acks", config.getRequestRequiredAcks()); - properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs()); - ProducerConfig producerConfig = new ProducerConfig(properties); - producer = new Producer(producerConfig); - } - - @Override - protected void execute(Object key, Map event, OutputCollector collector) throws Exception { - try { - String output = new ObjectMapper().writeValueAsString(event); - // partition key may cause data skew - //producer.send(new KeyedMessage(this.topicId, key, output)); - producer.send(new KeyedMessage(this.topicId, output)); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - throw ex; - } - } - - @Override - public void afterInstall() { - ensureTopicCreated(); - } - - private void ensureTopicCreated() { - LOG.info("TODO: ensure kafka topic {} created", this.topicId); - } - - private void ensureTopicDeleted() { - LOG.info("TODO: ensure kafka topic {} deleted", this.topicId); - } - - @Override - public void cleanup() { - if (this.producer != null) { - this.producer.close(); - } - } - - @Override - public void afterUninstall() { - ensureTopicDeleted(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java deleted file mode 100644 index bdc4f53..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.metadata.model.StreamSinkConfig; - -import java.util.Objects; - -/** - * FIXME Rename to KafkaStreamMessagingConfig. - */ -public class KafkaStreamSinkConfig implements StreamSinkConfig { - // Write Config - private String topicId; - private String brokerList; - private String serializerClass; - private String keySerializerClass; - private String numBatchMessages; - private String maxQueueBufferMs; - private String producerType; - private String requestRequiredAcks; - - public String getTopicId() { - return topicId; - } - - public void setTopicId(String topicId) { - this.topicId = topicId; - } - - public String getBrokerList() { - return brokerList; - } - - public void setBrokerList(String brokerList) { - this.brokerList = brokerList; - } - - public String getSerializerClass() { - return serializerClass; - } - - public void setSerializerClass(String serializerClass) { - this.serializerClass = serializerClass; - } - - public String getKeySerializerClass() { - return keySerializerClass; - } - - public void setKeySerializerClass(String keySerializerClass) { - this.keySerializerClass = keySerializerClass; - } - - public String getNumBatchMessages() { - return numBatchMessages; - } - - public void setNumBatchMessages(String numBatchMessages) { - this.numBatchMessages = numBatchMessages; - } - - public String getMaxQueueBufferMs() { - return maxQueueBufferMs; - } - - public void setMaxQueueBufferMs(String maxQueueBufferMs) { - this.maxQueueBufferMs = maxQueueBufferMs; - } - - public String getProducerType() { - return producerType; - } - - public void setProducerType(String producerType) { - this.producerType = producerType; - } - - public String getRequestRequiredAcks() { - return requestRequiredAcks; - } - - public void setRequestRequiredAcks(String requestRequiredAcks) { - this.requestRequiredAcks = requestRequiredAcks; - } - - @Override - public String getType() { - return "KAFKA"; - } - - @Override - public Class<?> getSinkType() { - return KafkaStreamSink.class; - } - - @Override - public Class<? extends StreamSinkConfig> getConfigType() { - return KafkaStreamSinkConfig.class; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof KafkaStreamSinkConfig)) { - return false; - } - - KafkaStreamSinkConfig config = (KafkaStreamSinkConfig) o; - - if (!getTopicId().equals(config.getTopicId())) { - return false; - } - if (getBrokerList() != null ? !getBrokerList().equals(config.getBrokerList()) : config.getBrokerList() != null) { - return false; - } - if (getSerializerClass() != null ? !getSerializerClass().equals(config.getSerializerClass()) : config.getSerializerClass() != null) { - return false; - } - if (getKeySerializerClass() != null ? !getKeySerializerClass().equals(config.getKeySerializerClass()) : config.getKeySerializerClass() != null) { - return false; - } - if (getNumBatchMessages() != null ? !getNumBatchMessages().equals(config.getNumBatchMessages()) : config.getNumBatchMessages() != null) { - return false; - } - if (getMaxQueueBufferMs() != null ? !getMaxQueueBufferMs().equals(config.getMaxQueueBufferMs()) : config.getMaxQueueBufferMs() != null) { - return false; - } - if (getProducerType() != null ? !getProducerType().equals(config.getProducerType()) : config.getProducerType() != null) { - return false; - } - return getRequestRequiredAcks() != null ? getRequestRequiredAcks().equals(config.getRequestRequiredAcks()) : config.getRequestRequiredAcks() == null; - - } - - @Override - public int hashCode() { - int result = getTopicId().hashCode(); - result = 31 * result + (getBrokerList() != null ? getBrokerList().hashCode() : 0); - result = 31 * result + (getSerializerClass() != null ? getSerializerClass().hashCode() : 0); - result = 31 * result + (getKeySerializerClass() != null ? getKeySerializerClass().hashCode() : 0); - result = 31 * result + (getNumBatchMessages() != null ? getNumBatchMessages().hashCode() : 0); - result = 31 * result + (getMaxQueueBufferMs() != null ? getMaxQueueBufferMs().hashCode() : 0); - result = 31 * result + (getProducerType() != null ? getProducerType().hashCode() : 0); - result = 31 * result + (getRequestRequiredAcks() != null ? getRequestRequiredAcks().hashCode() : 0); - return result; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java deleted file mode 100644 index 5cc5145..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.spout.Scheme; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.spout.SchemeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.*; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; - -public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig> { - private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamSource.class); - private KafkaSpout spout; - - @Override - public void init(String streamId, KafkaStreamSourceConfig config) { - this.spout = createKafkaSpout(config); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.spout.open(conf, context, collector); - } - - @Override - public void nextTuple() { - this.spout.nextTuple(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - this.spout.declareOutputFields(declarer); - } - - @Override - public void close() { - this.spout.close(); - } - - @Override - public void activate() { - this.spout.activate(); - } - - @Override - public void deactivate() { - this.spout.deactivate(); - } - - @Override - public void ack(Object msgId) { - this.spout.ack(msgId); - } - - @Override - public void fail(Object msgId) { - this.spout.fail(msgId); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return this.spout.getComponentConfiguration(); - } - - // ---------------- - // Helper Methods - // ---------------- - - private static KafkaSpout createKafkaSpout(KafkaStreamSourceConfig config) { - - // the following is for fetching data from one topic - // Kafka topic - String topic = config.getTopicId(); - // Kafka broker zk connection - String zkConnString = config.getBrokerZkQuorum(); - // Kafka fetch size - int fetchSize = config.getFetchSize(); - LOG.info(String.format("Use topic : %s, zkQuorum : %s , fetchSize : %d", topic, zkConnString, fetchSize)); - - /* - the following is for recording offset for processing the data - the zk path to store current offset is comprised of the following - offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id - - consumerGroupId is for differentiating different consumers which consume the same topic - */ - // transaction zkRoot - String zkRoot = config.getTransactionZKRoot(); - // Kafka consumer group id - String groupId = config.getConsumerGroupId(); - String brokerZkPath = config.getBrokerZkPath(); - - BrokerHosts hosts; - if (StringUtils.isNotBlank(brokerZkPath)) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers to store kafka consumer offset. Default to use storm zookeeper - if (StringUtils.isNotBlank(config.getTransactionZkServers())) { - String[] txZkServers = config.getTransactionZkServers().split(","); - spoutConfig.zkServers = Arrays.stream(txZkServers).map(server -> server.split(":")[0]).collect(Collectors.toList()); - spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]); - LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort); - } - - // transaction update interval - spoutConfig.stateUpdateIntervalMs = config.getTransactionStateUpdateMS(); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); - - // "startOffsetTime" is for test usage, prod should not use this - if (config.getStartOffsetTime() >= 0) { - spoutConfig.startOffsetTime = config.getStartOffsetTime(); - } - // "forceFromStart" is for test usage, prod should not use this - if (config.isForceFromStart()) { - spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - } - - Preconditions.checkNotNull(config.getSchemaClass(), "schemaClass is null"); - try { - Scheme s = config.getSchemaClass().newInstance(); - spoutConfig.scheme = new SchemeAsMultiScheme(s); - } catch (Exception ex) { - LOG.error("Error instantiating scheme object"); - throw new IllegalStateException(ex); - } - return new KafkaSpout(spoutConfig); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java deleted file mode 100644 index d0a91da..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.metadata.model.StreamSourceConfig; - -public class KafkaStreamSourceConfig implements StreamSourceConfig { - private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource"; - private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers"; - private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class; - - // Read Config - private String topicId; - private String brokerZkQuorum; - private String brokerZkBasePath; - private String transactionZkServers; - - private int fetchSize = 1048576; - private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT; - private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID; - private String brokerZkPath = "/brokers"; - private long transactionStateUpdateMS = 2000; - private int startOffsetTime = -1; - private boolean forceFromStart = false; - private Class<? extends backtype.storm.spout.Scheme> schemaClass = DEFAULT_KAFKA_SCHEMA; - - public String getBrokerZkQuorum() { - return brokerZkQuorum; - } - - public void setBrokerZkQuorum(String brokerZkQuorum) { - this.brokerZkQuorum = brokerZkQuorum; - } - - public String getBrokerZkBasePath() { - return brokerZkBasePath; - } - - public void setBrokerZkBasePath(String brokerZkBasePath) { - this.brokerZkBasePath = brokerZkBasePath; - } - - public int getFetchSize() { - return fetchSize; - } - - public void setFetchSize(int fetchSize) { - this.fetchSize = fetchSize; - } - - public String getTransactionZKRoot() { - return transactionZKRoot; - } - - public void setTransactionZKRoot(String transactionZKRoot) { - this.transactionZKRoot = transactionZKRoot; - } - - public String getConsumerGroupId() { - return consumerGroupId; - } - - public void setConsumerGroupId(String consumerGroupId) { - this.consumerGroupId = consumerGroupId; - } - - public String getBrokerZkPath() { - return brokerZkPath; - } - - public void setBrokerZkPath(String brokerZkPath) { - this.brokerZkPath = brokerZkPath; - } - - public String getTransactionZkServers() { - return transactionZkServers; - } - - public void setTransactionZkServers(String transactionZkServers) { - this.transactionZkServers = transactionZkServers; - } - - public long getTransactionStateUpdateMS() { - return transactionStateUpdateMS; - } - - public void setTransactionStateUpdateMS(long transactionStateUpdateMS) { - this.transactionStateUpdateMS = transactionStateUpdateMS; - } - - public int getStartOffsetTime() { - return startOffsetTime; - } - - public void setStartOffsetTime(int startOffsetTime) { - this.startOffsetTime = startOffsetTime; - } - - public boolean isForceFromStart() { - return forceFromStart; - } - - public void setForceFromStart(boolean forceFromStart) { - this.forceFromStart = forceFromStart; - } - - public Class<? extends backtype.storm.spout.Scheme> getSchemaClass() { - return schemaClass; - } - - public void setSchemaClass(Class<? extends backtype.storm.spout.Scheme> schemaClass) { - this.schemaClass = schemaClass; - } - - @Override - public String getType() { - return "KAFKA"; - } - - @Override - public Class<?> getSourceType() { - return KafkaStreamSource.class; - } - - @Override - public Class<? extends StreamSourceConfig> getConfigType() { - return StreamSourceConfig.class; - } - - public String getTopicId() { - return topicId; - } - - public void setTopicId(String topicId) { - this.topicId = topicId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof KafkaStreamSourceConfig)) { - return false; - } - - KafkaStreamSourceConfig that = (KafkaStreamSourceConfig) o; - - if (getFetchSize() != that.getFetchSize()) { - return false; - } - if (getTransactionStateUpdateMS() != that.getTransactionStateUpdateMS()) { - return false; - } - if (getStartOffsetTime() != that.getStartOffsetTime()) { - return false; - } - if (isForceFromStart() != that.isForceFromStart()) { - return false; - } - if (getTopicId() != null ? !getTopicId().equals(that.getTopicId()) : that.getTopicId() != null) { - return false; - } - if (getBrokerZkQuorum() != null ? !getBrokerZkQuorum().equals(that.getBrokerZkQuorum()) : that.getBrokerZkQuorum() != null) { - return false; - } - if (getBrokerZkBasePath() != null ? !getBrokerZkBasePath().equals(that.getBrokerZkBasePath()) : that.getBrokerZkBasePath() != null) { - return false; - } - if (getTransactionZkServers() != null ? !getTransactionZkServers().equals(that.getTransactionZkServers()) : that.getTransactionZkServers() != null) { - return false; - } - if (getTransactionZKRoot() != null ? !getTransactionZKRoot().equals(that.getTransactionZKRoot()) : that.getTransactionZKRoot() != null) { - return false; - } - if (getConsumerGroupId() != null ? !getConsumerGroupId().equals(that.getConsumerGroupId()) : that.getConsumerGroupId() != null) { - return false; - } - if (getBrokerZkPath() != null ? !getBrokerZkPath().equals(that.getBrokerZkPath()) : that.getBrokerZkPath() != null) { - return false; - } - return getSchemaClass() != null ? getSchemaClass().equals(that.getSchemaClass()) : that.getSchemaClass() == null; - - } - - @Override - public int hashCode() { - int result = getTopicId() != null ? getTopicId().hashCode() : 0; - result = 31 * result + (getBrokerZkQuorum() != null ? getBrokerZkQuorum().hashCode() : 0); - result = 31 * result + (getBrokerZkBasePath() != null ? getBrokerZkBasePath().hashCode() : 0); - result = 31 * result + (getTransactionZkServers() != null ? getTransactionZkServers().hashCode() : 0); - result = 31 * result + getFetchSize(); - result = 31 * result + (getTransactionZKRoot() != null ? getTransactionZKRoot().hashCode() : 0); - result = 31 * result + (getConsumerGroupId() != null ? getConsumerGroupId().hashCode() : 0); - result = 31 * result + (getBrokerZkPath() != null ? getBrokerZkPath().hashCode() : 0); - result = 31 * result + (int) (getTransactionStateUpdateMS() ^ (getTransactionStateUpdateMS() >>> 32)); - result = 31 * result + getStartOffsetTime(); - result = 31 * result + (isForceFromStart() ? 1 : 0); - result = 31 * result + (getSchemaClass() != null ? getSchemaClass().hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java deleted file mode 100644 index 90e6481..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.metadata.model.MetricSchemaEntity; -import org.apache.eagle.app.environment.builder.MetricDescriptor; -import org.apache.eagle.service.client.EagleServiceClientException; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; - -public class MetricSchemaGenerator extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(MetricSchemaGenerator.class); - private static int MAX_CACHE_LENGTH = 1000; - public static final String GENERIC_METRIC_VALUE_NAME = "value"; - - private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH); - private final MetricDescriptor metricDescriptor; - private final Config config; - - private OutputCollector collector; - private IEagleServiceClient client; - - public MetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) { - this.metricDescriptor = metricDescriptor; - this.config = config; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - this.client = new EagleServiceClientImpl(config); - } - - @Override - public void execute(Tuple input) { - try { - String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD); - synchronized (metricNameCache) { - if (!metricNameCache.contains(metricName)) { - createMetricSchemaEntity(metricName, (Map) input.getValueByField(MetricStreamPersist.METRIC_EVENT_FIELD),this.metricDescriptor); - metricNameCache.add(metricName); - } - if (metricNameCache.size() > MAX_CACHE_LENGTH) { - this.metricNameCache.clear(); - } - } - this.collector.ack(input); - } catch (Throwable throwable) { - LOG.warn(throwable.getMessage(), throwable); - this.collector.reportError(throwable); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - - @Override - public void cleanup() { - if (this.client != null) { - try { - this.client.close(); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - } - - private void createMetricSchemaEntity(String metricName, Map event, MetricDescriptor metricDescriptor) throws IOException, EagleServiceClientException { - MetricSchemaEntity schemaEntity = new MetricSchemaEntity(); - Map<String, String> schemaTags = new HashMap<>(); - schemaEntity.setTags(schemaTags); - schemaTags.put(MetricSchemaEntity.METRIC_SITE_TAG, metricDescriptor.getSiteIdSelector().getSiteId(event)); - schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName); - schemaTags.put(MetricSchemaEntity.METRIC_GROUP_TAG, metricDescriptor.getMetricGroupSelector().getMetricGroup(event)); - schemaEntity.setGranularityByField(metricDescriptor.getGranularity()); - schemaEntity.setDimensionFields(metricDescriptor.getDimensionFields()); - schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME)); - schemaEntity.setModifiedTimestamp(System.currentTimeMillis()); - GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(schemaEntity)); - if (response.isSuccess()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created {}", schemaEntity); - } - } else { - LOG.error("Failed to create {}", schemaEntity, response.getException()); - throw new IOException("Service error: " + response.getException()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java deleted file mode 100644 index c9b43e5..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.apache.eagle.app.environment.builder.MetricDescriptor; -import org.apache.eagle.app.utils.StreamConvertHelper; -import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.log.entity.GenericMetricEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.BatchSender; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class MetricStreamPersist extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class); - public static final String METRIC_NAME_FIELD = "metricName"; - public static final String METRIC_EVENT_FIELD = "metricEvent"; - - private final Config config; - private final MetricMapper mapper; - private final int batchSize; - private IEagleServiceClient client; - private OutputCollector collector; - private BatchSender batchSender; - - public MetricStreamPersist(MetricDescriptor metricDescriptor, Config config) { - this.config = config; - this.mapper = new StructuredMetricMapper(metricDescriptor); - this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1; - } - - public MetricStreamPersist(MetricMapper mapper, Config config) { - this.config = config; - this.mapper = mapper; - this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(config); - if (this.batchSize > 0) { - this.batchSender = client.batch(this.batchSize); - } - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - GenericMetricEntity metricEntity = null; - Map event = null; - try { - event = StreamConvertHelper.tupleToEvent(input).f1(); - metricEntity = this.mapper.map(event); - if (batchSize <= 1) { - GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity)); - if (!response.isSuccess()) { - LOG.error("Service side error: {}", response.getException()); - collector.reportError(new IllegalStateException(response.getException())); - } - } else { - this.batchSender.send(metricEntity); - } - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - collector.reportError(ex); - } finally { - if (metricEntity != null && event != null) { - collector.emit(Arrays.asList(metricEntity.getPrefix(), event)); - } - collector.ack(input); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(METRIC_NAME_FIELD, METRIC_EVENT_FIELD)); - } - - @Override - public void cleanup() { - try { - this.client.close(); - } catch (IOException e) { - LOG.error("Close client error: {}", e.getMessage(), e); - } finally { - super.cleanup(); - } - } - - @FunctionalInterface - public interface MetricMapper extends Serializable { - GenericMetricEntity map(Map event); - } - - public class StructuredMetricMapper implements MetricMapper { - private final MetricDescriptor metricDescriptor; - - private StructuredMetricMapper(MetricDescriptor metricDescriptor) { - this.metricDescriptor = metricDescriptor; - } - - @Override - public GenericMetricEntity map(Map event) { - String metricName = metricDescriptor.getMetricNameSelector().getMetricName(event); - Preconditions.checkNotNull(metricName, "Metric name is null"); - Long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event); - Preconditions.checkNotNull(timestamp, "Timestamp is null"); - Map<String, String> tags = new HashMap<>(); - for (String dimensionField : metricDescriptor.getDimensionFields()) { - Preconditions.checkNotNull(dimensionField, "Dimension field name is null"); - tags.put(dimensionField, (String) event.get(dimensionField)); - } - - double[] values; - if (event.containsKey(metricDescriptor.getValueField())) { - values = new double[] {(double) event.get(metricDescriptor.getValueField())}; - } else { - LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); - values = new double[] {0}; - } - - GenericMetricEntity entity = new GenericMetricEntity(); - entity.setPrefix(metricName); - entity.setTimestamp(DateTimeUtil.roundDown(metricDescriptor.getGranularity(), timestamp)); - entity.setTags(tags); - entity.setValue(values); - return entity; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java deleted file mode 100644 index ef6c8d6..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSink.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.task.OutputCollector; -import backtype.storm.topology.base.BaseRichBolt; -import org.apache.eagle.app.utils.StreamConvertHelper; -import org.apache.eagle.common.utils.Tuple2; -import org.apache.eagle.metadata.model.StreamSinkConfig; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt implements StreamSink<K> { - private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class); - private String streamId; - private OutputCollector collector; - - @Override - public void init(String streamId, K config) { - this.streamId = streamId; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - /** - * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map]. - */ - @Override - public void execute(Tuple input) { - try { - Tuple2<Object,Map> keyValue = StreamConvertHelper.tupleToEvent(input); - execute(keyValue.f0(), keyValue.f1(), collector); - collector.ack(input); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - collector.reportError(ex); - } - } - - protected abstract void execute(Object key, Map event, OutputCollector collector) throws Exception; - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - public String getStreamId() { - return streamId; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java deleted file mode 100644 index b31de46..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StormStreamSource.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import backtype.storm.topology.base.BaseRichSpout; -import org.apache.eagle.metadata.model.StreamSinkConfig; -import org.apache.eagle.metadata.model.StreamSourceConfig; - -public abstract class StormStreamSource<T extends StreamSourceConfig> extends BaseRichSpout implements StreamSource<T> { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java deleted file mode 100644 index 42ea36e..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.alert.engine.model.StreamEvent; -import backtype.storm.tuple.Tuple; - -import java.io.Serializable; -import java.util.List; - -@FunctionalInterface -public interface StreamEventMapper extends Serializable { - /** - * Map from storm tuple to Stream Event. - * - * @param tuple - * @return - * @throws Exception - */ - List<StreamEvent> map(Tuple tuple) throws Exception; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java deleted file mode 100644 index 0dbc1a7..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamProvider.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import com.typesafe.config.Config; -import org.apache.eagle.metadata.model.StreamSinkConfig; -import org.apache.eagle.metadata.model.StreamSourceConfig; - -import java.lang.reflect.ParameterizedType; - -/** - * Stream Messaging Bus. - */ -public interface StreamProvider<W extends StreamSink<C>, C extends StreamSinkConfig, - R extends StreamSource<F>, F extends StreamSourceConfig> { - - C getSinkConfig(String streamId, Config config); - - W getSink(); - - default W getSink(String streamId, Config config) { - W s = getSink(); - s.init(streamId, getSinkConfig(streamId, config)); - return s; - } - - F getSourceConfig(String streamId, Config config); - - R getSource(); - - default R getSource(String streamId, Config config) { - R i = getSource(); - i.init(streamId, getSourceConfig(streamId, config)); - return i; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java deleted file mode 100644 index f76cdbf..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -public class StreamRecord extends HashMap<String,Object> implements Serializable { - public StreamRecord() { - } - - public StreamRecord(Map<String,Object> event) { - this.putAll(event); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java deleted file mode 100644 index 7ba4a9a..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSink.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.app.ApplicationLifecycle; -import org.apache.eagle.metadata.model.StreamSinkConfig; - -public interface StreamSink<T extends StreamSinkConfig> extends ApplicationLifecycle { - void init(String streamId,T config); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java deleted file mode 100644 index af3965f..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamSource.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.messaging; - -import org.apache.eagle.metadata.model.StreamSourceConfig; - -public interface StreamSource<T extends StreamSourceConfig> { - void init(String streamId, T config); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java deleted file mode 100644 index a74b6f2..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationExtensionLoader.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.module; - -import org.apache.eagle.app.service.ApplicationProviderService; -import org.apache.eagle.common.module.ModuleRegistry; -import org.apache.eagle.common.module.ModuleRegistryImpl; -import com.google.inject.Guice; -import com.google.inject.Module; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ApplicationExtensionLoader { - private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationExtensionLoader.class); - - public static ModuleRegistry load(Module... context) { - LOGGER.warn("Loading application extension modules"); - ModuleRegistry registry = new ModuleRegistryImpl(); - Guice.createInjector(context).getInstance(ApplicationProviderService.class).getProviders().forEach((provider) -> { - LOGGER.warn("Registering modules from {}", provider); - provider.register(registry); - }); - return registry; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java deleted file mode 100644 index 6c8c310..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/module/ApplicationGuiceModule.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.module; - -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.app.service.ApplicationHealthCheckService; -import org.apache.eagle.app.service.ApplicationManagementService; -import org.apache.eagle.app.service.ApplicationProviderService; -import org.apache.eagle.app.service.impl.ApplicationHealthCheckServiceImpl; -import org.apache.eagle.app.service.impl.ApplicationManagementServiceImpl; -import org.apache.eagle.app.service.impl.ApplicationProviderServiceImpl; -import org.apache.eagle.app.service.impl.ApplicationStatusUpdateServiceImpl; -import org.apache.eagle.metadata.service.ApplicationDescService; -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; -import com.google.inject.util.Providers; -import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; - -public class ApplicationGuiceModule extends AbstractModule { - private final ApplicationProviderService appProviderInst; - - public ApplicationGuiceModule(ApplicationProviderService appProviderInst) { - this.appProviderInst = appProviderInst; - } - - public ApplicationGuiceModule() { - this.appProviderInst = new ApplicationProviderServiceImpl(ConfigFactory.load()); - } - - @Override - protected void configure() { - bind(ApplicationProviderService.class).toProvider(Providers.of(appProviderInst)); - bind(ApplicationDescService.class).toProvider(Providers.of(appProviderInst)); - bind(ApplicationManagementService.class).to(ApplicationManagementServiceImpl.class).in(Singleton.class); - bind(ApplicationStatusUpdateService.class).to(ApplicationStatusUpdateServiceImpl.class).in(Singleton.class); - bind(ApplicationHealthCheckService.class).to(ApplicationHealthCheckServiceImpl.class).in(Singleton.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java deleted file mode 100644 index b91a70c..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - * <h1>Application Management Framework Interfaces</h1> - * - * <ul> - * <li>Application Context (Runtime): org.apache.eagle.app.service.ApplicationOperationContext</li> - * <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li> - * <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li> - * <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycle</li> - * </ul> - */ -package org.apache.eagle.app; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java deleted file mode 100644 index 3c62367..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.resource; - -import org.apache.eagle.app.service.ApplicationManagementService; -import org.apache.eagle.app.service.ApplicationOperations; -import org.apache.eagle.app.service.ApplicationProviderService; -import org.apache.eagle.common.rest.RESTResponse; -import org.apache.eagle.metadata.model.ApplicationDesc; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.metadata.service.ApplicationEntityService; -import com.google.inject.Inject; - -import java.util.Collection; -import javax.ws.rs.*; -import javax.ws.rs.core.MediaType; - -@Path("/apps") -public class ApplicationResource { - private final ApplicationProviderService providerService; - private final ApplicationManagementService applicationManagementService; - private final ApplicationEntityService entityService; - - @Inject - public ApplicationResource( - ApplicationProviderService providerService, - ApplicationManagementService applicationManagementService, - ApplicationEntityService entityService) { - this.providerService = providerService; - this.applicationManagementService = applicationManagementService; - this.entityService = entityService; - } - - @GET - @Path("/providers") - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Collection<ApplicationDesc>> getApplicationDescs() { - return RESTResponse.async(providerService::getApplicationDescs).get(); - } - - @GET - @Path("/providers/{type}") - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<ApplicationDesc> getApplicationDescByType(@PathParam("type") String type) { - return RESTResponse.async(() -> providerService.getApplicationDescByType(type)).get(); - } - - @PUT - @Path("/providers/reload") - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Collection<ApplicationDesc>> reloadApplicationDescs() { - return RESTResponse.<Collection<ApplicationDesc>>async((response) -> { - providerService.reload(); - response.message("Successfully reload application providers"); - response.data(providerService.getApplicationDescs()); - }).get(); - } - - @GET - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Collection<ApplicationEntity>> getApplicationEntities(@QueryParam("siteId") String siteId) { - return RESTResponse.async(() -> { - if (siteId == null) { - return entityService.findAll(); - } else { - return entityService.findBySiteId(siteId); - } - }).get(); - } - - @GET - @Path("/{appUuid}") - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<ApplicationEntity> getApplicationEntityByUUID(@PathParam("appUuid") String appUuid) { - return RESTResponse.async(() -> entityService.getByUUID(appUuid)).get(); - } - - @POST - @Path("/{appUuid}") - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<ApplicationEntity> updateApplicationEntity(@PathParam("appUuid") String appUuid, ApplicationOperations.UpdateOperation updateOperation) { - return RESTResponse.async(() -> { - ApplicationEntity applicationEntity = new ApplicationEntity(); - applicationEntity.setStatus(entityService.getByUUID(appUuid).getStatus()); - applicationEntity.setUuid(appUuid); - applicationEntity.setJarPath(updateOperation.getJarPath()); - applicationEntity.setMode(updateOperation.getMode()); - applicationEntity.setConfiguration(updateOperation.getConfiguration()); - return entityService.update(applicationEntity); - }).get(); - } - - @POST - @Path("/status") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<ApplicationEntity.Status> checkApplicationStatusByUUID(ApplicationOperations.CheckStatusOperation operation) { - return RESTResponse.<ApplicationEntity.Status>async((response) -> { - ApplicationEntity.Status status = (entityService.getByUUIDOrAppId(null, operation.getAppId())).getStatus(); - response.success(true).message("Successfully fetched application status"); - response.data(status); - }).get(); - } - - /** - * <b>Request:</b> - * <pre> - * { - * uuid: APPLICATION_UUID - * } - * </pre>. - */ - @POST - @Path("/install") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<ApplicationEntity> installApplication(ApplicationOperations.InstallOperation operation) { - return RESTResponse.<ApplicationEntity>async((response) -> { - ApplicationEntity entity = applicationManagementService.install(operation); - response.message("Successfully installed application " + operation.getAppType() + " onto site " + operation.getSiteId()); - response.data(entity); - }).get(); - } - - /** - * <b>Request:</b> - * <pre> - * { - * uuid: APPLICATION_UUID - * } - * </pre>. - * - * @param operation - */ - @DELETE - @Path("/uninstall") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> uninstallApplication(ApplicationOperations.UninstallOperation operation) { - return RESTResponse.<Void>async((response) -> { - ApplicationEntity entity = applicationManagementService.uninstall(operation); - response.success(true).message("Successfully uninstalled application " + entity.getUuid()); - }).get(); - } - - /** - * <b>Request:</b> - * <pre> - * { - * uuid: APPLICATION_UUID - * } - * </pre> - * operation. - * @param operation - */ - @POST - @Path("/start") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> startApplication(ApplicationOperations.StartOperation operation) { - return RESTResponse.<Void>async((response) -> { - ApplicationEntity entity = applicationManagementService.start(operation); - response.success(true).message("Starting application " + entity.getUuid()); - }).get(); - } - - /** - * <b>Request:</b> - * <pre> - * { - * uuid: APPLICATION_UUID - * } - * </pre>. - * - * @param operation - */ - @POST - @Path("/stop") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> stopApplication(ApplicationOperations.StopOperation operation) { - return RESTResponse.<Void>async((response) -> { - ApplicationEntity entity = applicationManagementService.stop(operation); - response.success(true).message("Stopping application " + entity.getUuid()); - }).get(); - } - -} \ No newline at end of file
