[EAGLE-630] Add Publishments to Policy API

Path:

    POST /metadata/policies/{policyId}/publishments/

Data: String array, like:

    [
       "publishmentName1", "publishmentName2",.., "publishmentNameN"
    ]

Author: Hao Chen <h...@apache.org>

Closes #518 from haoch/EAGLE-630.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eda6e586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eda6e586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eda6e586

Branch: refs/heads/master
Commit: eda6e586bb598d443e11bbf6db6a72ca966d0b9b
Parents: a710082
Author: Hao Chen <h...@apache.org>
Authored: Mon Oct 17 14:06:04 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Mon Oct 17 14:06:04 2016 +0800

----------------------------------------------------------------------
 .../metadata/resource/MetadataResource.java     | 43 ++++++++++++++++++++
 .../eagle/alert/metadata/IMetadataDao.java      | 19 +++++++++
 2 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eda6e586/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index d540fb5..32a978a 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.service.metadata.resource;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -30,8 +31,10 @@ import com.google.inject.Inject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import javax.ws.rs.*;
 
@@ -231,6 +234,46 @@ public class MetadataResource {
         ).collect(Collectors.toList());
     }
 
+    @Path("/policies/{policyId}/publishments")
+    @POST
+    public OpResult addPublishmentsToPolicy(@PathParam("policyId") String 
policyId, List<String> publishmentIds) {
+        OpResult result = new OpResult();
+        try {
+            getPolicyByID(policyId);
+            Map<String,Publishment> publishmentMap = new HashMap<>();
+            listPublishment().forEach((pub) -> 
publishmentMap.put(pub.getName(),pub));
+            for (String publishmentId : publishmentIds) {
+                if (publishmentMap.containsKey(publishmentId)) {
+                    Publishment publishment = 
publishmentMap.get(publishmentId);
+                    if (publishment.getPolicyIds().contains(policyId)) {
+                        LOG.warn("Policy {} was already bound with publisher 
{}",policyId, publishmentId);
+                    } else {
+                        publishment.getPolicyIds().add(policyId);
+                    }
+                    OpResult opResult = addPublishment(publishment);
+                    if (opResult.code == OpResult.FAILURE) {
+                        LOG.error("Failed to add publisher {} to policy {}: 
{}", publishmentId, policyId, opResult.message);
+                        return opResult;
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(opResult.message);
+                        }
+                    }
+                } else {
+                    throw new IllegalArgumentException("Publishsment (name: " 
+ publishmentId + ") not found");
+                }
+            }
+            result.code = OpResult.SUCCESS;
+            result.message = "Successfully add " + publishmentIds.size() + " 
publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + 
policyId;
+            LOG.info(result.message);
+        } catch (Exception ex) {
+            result.code = OpResult.FAILURE;
+            result.message = "Failed to add publishments: [" + 
StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " 
+ ex.getMessage();
+            LOG.error(result.message,ex);
+        }
+        return result;
+    }
+
     @Path("/policies/{policyId}")
     @GET
     public PolicyDefinition getPolicyByID(@PathParam("policyId") String 
policyId) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eda6e586/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index d245afd..06e96c7 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.metadata;
 
+import com.google.common.base.Preconditions;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -23,9 +24,13 @@ import 
org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public interface IMetadataDao extends Closeable {
 
@@ -63,6 +68,7 @@ public interface IMetadataDao extends Closeable {
 
     OpResult addPublishment(Publishment publishment);
 
+
     OpResult removePublishment(String pubId);
 
     List<PublishmentType> listPublishmentType();
@@ -88,4 +94,17 @@ public interface IMetadataDao extends Closeable {
 
     OpResult importModels(Models models);
 
+    // -----------------------------------------------------------
+    //  Extended Metadata DAO Methods with default implementation
+    // -----------------------------------------------------------
+
+    Logger LOG = LoggerFactory.getLogger(IMetadataDao.class);
+
+    default PolicyDefinition getPolicyByID(String policyId) {
+        Preconditions.checkNotNull(policyId,"policyId");
+        return listPolicies().stream().filter(pc -> 
pc.getName().equals(policyId)).findAny().orElseGet(() -> {
+            LOG.error("Policy (policyId " + policyId + ") not found");
+            throw new IllegalArgumentException("Policy (policyId " + policyId 
+ ") not found");
+        });
+    }
 }

Reply via email to