Repository: incubator-eagle
Updated Branches:
  refs/heads/master f07777706 -> 2e5cb719c


[MINOR] Enhance new publisher AlertEagleStorePlugin

https://issues.apache.org/jira/browse/EAGLE-681

* implement alert apis on jdbc & mongodb
* add a rest api for searching a alert by alertId

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #575 from qingwen220/EAGLE-681.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2e5cb719
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2e5cb719
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2e5cb719

Branch: refs/heads/master
Commit: 2e5cb719c3a49da920d8d6807e10bcfdb6e55934
Parents: f077777
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Fri Oct 28 10:10:50 2016 +0800
Committer: zombieJ <smith3...@gmail.com>
Committed: Fri Oct 28 10:10:50 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/model/AlertPublishEvent.java   |  5 +-
 .../alert/engine/model/AlertStreamEvent.java    |  4 +-
 .../AbstractMetadataChangeNotifyService.java    |  4 +-
 .../impl/ZKMetadataChangeNotifyService.java     |  4 +-
 .../publisher/AlertPublishSpecListener.java     |  2 +-
 .../publisher/impl/AlertEagleStorePlugin.java   |  2 +-
 .../alert/engine/runner/AlertPublisherBolt.java |  3 +-
 .../metadata/resource/MetadataResource.java     |  8 +-
 .../eagle/alert/metadata/IMetadataDao.java      |  2 +
 .../eagle/alert/metadata/MetadataUtils.java     |  4 +
 .../metadata/impl/InMemMetadataDaoImpl.java     | 10 +++
 .../metadata/impl/JdbcDatabaseHandler.java      | 94 ++++++--------------
 .../metadata/impl/JdbcMetadataDaoImpl.java      | 11 ++-
 .../metadata/impl/MongoMetadataDaoImpl.java     | 23 ++++-
 .../alert/resource/impl/MongoImplTest.java      | 11 +++
 15 files changed, 106 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
index 9d73c2d..3453e55 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
@@ -96,7 +96,8 @@ public class AlertPublishEvent {
         AlertPublishEvent alertEvent = new AlertPublishEvent();
         alertEvent.setAlertId(UUID.randomUUID().toString());
         alertEvent.setPolicyId(event.getPolicyId());
-        if (event.getExtraData() != null) {
+        alertEvent.setAlertTimestamp(event.getCreatedTime());
+        if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
             
alertEvent.setSiteId(event.getExtraData().get(SITE_ID_KEY).toString());
             
alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString());
             alertEvent.setAppIds((List<String>) 
event.getExtraData().get(APP_IDS_KEY));
@@ -104,4 +105,6 @@ public class AlertPublishEvent {
         alertEvent.setAlertData(event.getDataMap());
         return alertEvent;
     }
+
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index e502244..442c885 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -72,8 +72,8 @@ public class AlertStreamEvent extends StreamEvent {
             }
         }
         return 
String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, 
createdBy=%s, metaVersion=%s]",
-            this.getStreamId(), 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-            StringUtils.join(dataStrings, ","), this.getPolicyId(), 
this.getCreatedBy(), this.getMetaVersion());
+                this.getStreamId(), 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+                StringUtils.join(dataStrings, ","), this.getPolicyId(), 
this.getCreatedBy(), this.getMetaVersion());
     }
 
     public String getCreatedBy() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
index bbb6ca1..3264e61 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/AbstractMetadataChangeNotifyService.java
@@ -103,8 +103,8 @@ public abstract class AbstractMetadataChangeNotifyService 
implements IMetadataCh
         alertPublishSpecListeners.forEach(s -> 
s.onAlertPublishSpecChange(alertPublishSpec, sds));
     }
 
-    protected void notifyAlertPublishBolt(Map<String, PolicyDefinition> pds) {
-        alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds));
+    protected void notifyAlertPublishBolt(Map<String, PolicyDefinition> pds, 
Map<String, StreamDefinition> sds) {
+        alertPublishSpecListeners.forEach(s -> s.onAlertPolicyChange(pds, 
sds));
     }
 
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
index 7bf4984..8dc6fdd 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/impl/ZKMetadataChangeNotifyService.java
@@ -123,7 +123,7 @@ public class ZKMetadataChangeNotifyService extends 
AbstractMetadataChangeNotifyS
                     prePopulate(alertSpec, state.getPolicySnapshots());
                     notifyAlertBolt(alertSpec, sds);
                     if (state.getPublishSpecs().get(topologyId) != null) {
-                        
notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()));
+                        
notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds);
                     }
                 }
                 break;
@@ -134,7 +134,7 @@ public class ZKMetadataChangeNotifyService extends 
AbstractMetadataChangeNotifyS
                 } else {
                     notifyAlertPublishBolt(pubSpec, sds);
                     if (state.getAlertSpecs().get(topologyId) != null) {
-                        
notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()));
+                        
notifyAlertPublishBolt(listToMap(state.getPolicySnapshots()), sds);
                     }
                 }
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
index d8429ca..b5401cb 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
@@ -25,5 +25,5 @@ import java.util.Map;
 public interface AlertPublishSpecListener {
     void onAlertPublishSpecChange(PublishSpec spec, Map<String, 
StreamDefinition> sds);
     
-    void onAlertPolicyChange(Map<String, PolicyDefinition> pds);
+    void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, 
StreamDefinition> sds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
index b9b5f2e..f517078 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePlugin.java
@@ -78,4 +78,4 @@ public class AlertEagleStorePlugin extends 
AbstractPublishPlugin {
         return null;
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index f0ad0ae..95f2a8f 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -118,8 +118,9 @@ public class AlertPublisherBolt extends AbstractStreamBolt 
implements AlertPubli
     }
 
     @Override
-    public void onAlertPolicyChange(Map<String, PolicyDefinition> pds) {
+    public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, 
Map<String, StreamDefinition> sds) {
         this.policyDefinitionMap = pds;
+        this.streamDefinitionMap = sds;
     }
 
     private void wrapAlertPublishEvent(AlertStreamEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index f5b34b8..05455f0 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -17,7 +17,6 @@
 package org.apache.eagle.service.metadata.resource;
 
 import com.google.common.base.Preconditions;
-import javafx.scene.control.Alert;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
@@ -33,7 +32,6 @@ import 
org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 import com.google.inject.Inject;
-import org.apache.storm.zookeeper.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -501,6 +499,12 @@ public class MetadataResource {
         return dao.listAlertPublishEvent(size);
     }
 
+    @Path("/alerts/{alertId}")
+    @GET
+    public AlertPublishEvent getAlertPublishEvent(@PathParam("alertId") String 
alertId) {
+        return dao.getAlertPublishEvent(alertId);
+    }
+
     @Path("/topologies/{topologyName}")
     @DELETE
     public OpResult removeTopology(@PathParam("topologyName") String 
topologyName) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index d25c548..33a3c39 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -80,6 +80,8 @@ public interface IMetadataDao extends Closeable {
 
     List<AlertPublishEvent> listAlertPublishEvent(int size);
 
+    AlertPublishEvent getAlertPublishEvent(String alertId);
+
     OpResult addAlertPublishEvent(AlertPublishEvent event);
 
     ScheduleState getScheduleState(String versionId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
index 7015a53..841a0a0 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
@@ -23,6 +23,7 @@ import 
org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
 import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +50,9 @@ public class MetadataUtils {
         if (t instanceof ScheduleState) {
             return ((ScheduleState) t).getVersion();
         }
+        if (t instanceof AlertPublishEvent) {
+            return ((AlertPublishEvent) t).getAlertId();
+        }
 
         try {
             Method m = t.getClass().getMethod("getName");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index 0de13f1..ad7d353 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -207,6 +207,16 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
+    public AlertPublishEvent getAlertPublishEvent(String alertId) {
+        Optional<AlertPublishEvent> op = alerts.stream().filter(alert -> 
alert.getAlertId().equals(alertId)).findAny();
+        if (op.isPresent()) {
+            return op.get();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
     public OpResult addAlertPublishEvent(AlertPublishEvent event) {
         alerts.add(event);
         OpResult result = new OpResult();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
index e484e09..d9580f1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
@@ -42,8 +42,11 @@ public class JdbcDatabaseHandler {
     private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?";
     private static final String UPDATE_STATEMENT = "UPDATE %s set value=? 
WHERE id=?";
     private static final String QUERY_ALL_STATEMENT = "SELECT value FROM %s";
-    private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM 
%s WHERE id=?";
+    private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM 
%s WHERE id=%s";
     private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM 
%s ORDER BY id %s";
+    private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value 
FROM %s limit %s";
+
+    public enum SortType { DESC, ASC }
 
     private Map<String, String> tblNameMap = new HashMap<>();
 
@@ -151,58 +154,20 @@ public class JdbcDatabaseHandler {
 
     public <T> List<T> list(Class<T> clz) {
         String tb = getTableName(clz.getSimpleName());
-        List<T> result = new LinkedList<>();
-        try {
-            Statement statement = connection.createStatement();
-            ResultSet rs = 
statement.executeQuery(String.format(QUERY_ALL_STATEMENT, tb));
-            while (rs.next()) {
-                //String key = rs.getString(1);
-                String json = rs.getString(1);
-                try {
-                    T obj = mapper.readValue(json, clz);
-                    result.add(obj);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-            }
-            rs.close();
-            statement.close();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-        return result;
+        String query = String.format(QUERY_ALL_STATEMENT, tb);
+        return executeSelectStatement(clz, query);
+    }
+
+    public <T> List<T> listSubset(Class<T> clz, int size) {
+        String tb = getTableName(clz.getSimpleName());
+        String query = String.format(QUERY_ALL_STATEMENT_WITH_SIZE, tb, size);
+        return executeSelectStatement(clz, query);
     }
 
     public <T> T listWithFilter(String key, Class<T> clz) {
         String tb = getTableName(clz.getSimpleName());
-        List<T> result = new LinkedList<>();
-        PreparedStatement statement = null;
-        try {
-            statement = 
connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb));
-            statement.setString(1, key);
-            ResultSet rs = statement.executeQuery();
-            while (rs.next()) {
-                //String key = rs.getString(1);
-                String json = rs.getString(1);
-                try {
-                    T obj = mapper.readValue(json, clz);
-                    result.add(obj);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-            }
-            rs.close();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.warn("Close statement failed");
-                }
-            }
-        }
+        String query = String.format(QUERY_CONDITION_STATEMENT, tb, key);
+        List<T> result = executeSelectStatement(clz, query);
         if (result.isEmpty()) {
             return null;
         } else {
@@ -212,11 +177,21 @@ public class JdbcDatabaseHandler {
 
     public <T> T listTop(Class<T> clz, String sortType) {
         String tb = getTableName(clz.getSimpleName());
+        String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType);
+        List<T> result = executeSelectStatement(clz, query);
+        if (result.isEmpty()) {
+            return null;
+        } else {
+            return result.get(0);
+        }
+    }
+
+    public <T> List<T> executeSelectStatement(Class<T> clz, String query) {
+        String tb = getTableName(clz.getSimpleName());
         List<T> result = new LinkedList<>();
-        PreparedStatement statement = null;
         try {
-            statement = 
connection.prepareStatement(String.format(QUERY_ORDERBY_STATEMENT, tb, 
sortType));
-            ResultSet rs = statement.executeQuery();
+            Statement statement = connection.createStatement();
+            ResultSet rs = statement.executeQuery(query);
             while (rs.next()) {
                 //String key = rs.getString(1);
                 String json = rs.getString(1);
@@ -228,22 +203,11 @@ public class JdbcDatabaseHandler {
                 }
             }
             rs.close();
+            statement.close();
         } catch (SQLException e) {
             e.printStackTrace();
-        } finally {
-            if (statement != null) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    LOG.warn("Close statement failed");
-                }
-            }
-        }
-        if (result.isEmpty()) {
-            return null;
-        } else {
-            return result.get(0);
         }
+        return result;
     }
 
     public <T> OpResult remove(String clzName, String key) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index 0b595ac..adf26d8 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -74,7 +74,12 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public List<AlertPublishEvent> listAlertPublishEvent(int size) {
-        return null;
+        return handler.listSubset(AlertPublishEvent.class, size);
+    }
+
+    @Override
+    public AlertPublishEvent getAlertPublishEvent(String alertId) {
+        return handler.listWithFilter(alertId, AlertPublishEvent.class);
     }
 
     @Override
@@ -85,7 +90,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public ScheduleState getScheduleState() {
-        return handler.listTop(ScheduleState.class, "DESC");
+        return handler.listTop(ScheduleState.class, 
JdbcDatabaseHandler.SortType.DESC.toString());
     }
 
     @Override
@@ -110,7 +115,7 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        return null;
+        return handler.addOrReplace(AlertPublishEvent.class.getSimpleName(), 
event);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index ab42e7d..f8186ac 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -125,6 +125,15 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             publishmentType.createIndex(doc1, io1);
         }
 
+        alerts = db.getCollection("alerts");
+        {
+            IndexOptions io1 = new 
IndexOptions().background(true).unique(true).name("alertIndex");
+            BsonDocument doc1 = new BsonDocument();
+            doc1.append("alertId", new BsonInt32(1));
+            alerts.createIndex(doc1, io1);
+        }
+
+
         // below is for schedule_specs and its splitted collections
         BsonDocument doc1 = new BsonDocument();
         IndexOptions io1 = new 
IndexOptions().background(true).name("versionIndex");
@@ -179,6 +188,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             filter.append("streamId", new BsonString(MetadataUtils.getKey(t)));
         } else if (t instanceof PublishmentType) {
             filter.append("type", new BsonString(MetadataUtils.getKey(t)));
+        } else if (t instanceof AlertPublishEvent) {
+            filter.append("alertId", new BsonString(MetadataUtils.getKey(t)));
         } else {
             filter.append("name", new BsonString(MetadataUtils.getKey(t)));
         }
@@ -307,12 +318,22 @@ public class MongoMetadataDaoImpl implements IMetadataDao 
{
 
     @Override
     public List<AlertPublishEvent> listAlertPublishEvent(int size) {
+        return list(alerts, AlertPublishEvent.class);
+    }
+
+    @Override
+    public AlertPublishEvent getAlertPublishEvent(String alertId) {
+        List<AlertPublishEvent> results = list(alerts, 
AlertPublishEvent.class);
+        Optional<AlertPublishEvent> op = results.stream().filter(alert -> 
alert.getAlertId().equals(alertId)).findAny();
+        if (op.isPresent()) {
+            return op.get();
+        }
         return null;
     }
 
     @Override
     public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        return null;
+        return addOrReplace(alerts, event);
     }
 
     private <T> OpResult addOne(MongoCollection<Document> collection, T t) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e5cb719/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
index 7f08133..ff83b80 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
@@ -32,6 +32,7 @@ import 
org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
 import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl;
 import org.apache.eagle.alert.metadata.resource.OpResult;
@@ -206,6 +207,16 @@ public class MongoImplTest {
             assigns = dao.listStreams();
             Assert.assertEquals(0, assigns.size());
         }
+        // alert
+        {
+            AlertPublishEvent alert = new AlertPublishEvent();
+            alert.setAlertTimestamp(System.currentTimeMillis());
+            alert.setAlertId(UUID.randomUUID().toString());
+            OpResult result = dao.addAlertPublishEvent(alert);
+            Assert.assertEquals(200, result.code);
+            List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2);
+            Assert.assertEquals(1, alerts.size());
+        }
     }
 
     private void test_addstate() {

Reply via email to