[EAGLE-630] Add Publishments to Policy API Path:
POST /metadata/policies/{policyId}/publishments/ Data: String array, like: [ "publishmentName1", "publishmentName2",.., "publishmentNameN" ] Author: Hao Chen <h...@apache.org> Closes #518 from haoch/EAGLE-630. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/eda6e586 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/eda6e586 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/eda6e586 Branch: refs/heads/master Commit: eda6e586bb598d443e11bbf6db6a72ca966d0b9b Parents: a710082 Author: Hao Chen <h...@apache.org> Authored: Mon Oct 17 14:06:04 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Mon Oct 17 14:06:04 2016 +0800 ---------------------------------------------------------------------- .../metadata/resource/MetadataResource.java | 43 ++++++++++++++++++++ .../eagle/alert/metadata/IMetadataDao.java | 19 +++++++++ 2 files changed, 62 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eda6e586/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index d540fb5..32a978a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -17,6 +17,7 @@ package org.apache.eagle.service.metadata.resource; import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; @@ -30,8 +31,10 @@ import com.google.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import javax.ws.rs.*; @@ -231,6 +234,46 @@ public class MetadataResource { ).collect(Collectors.toList()); } + @Path("/policies/{policyId}/publishments") + @POST + public OpResult addPublishmentsToPolicy(@PathParam("policyId") String policyId, List<String> publishmentIds) { + OpResult result = new OpResult(); + try { + getPolicyByID(policyId); + Map<String,Publishment> publishmentMap = new HashMap<>(); + listPublishment().forEach((pub) -> publishmentMap.put(pub.getName(),pub)); + for (String publishmentId : publishmentIds) { + if (publishmentMap.containsKey(publishmentId)) { + Publishment publishment = publishmentMap.get(publishmentId); + if (publishment.getPolicyIds().contains(policyId)) { + LOG.warn("Policy {} was already bound with publisher {}",policyId, publishmentId); + } else { + publishment.getPolicyIds().add(policyId); + } + OpResult opResult = addPublishment(publishment); + if (opResult.code == OpResult.FAILURE) { + LOG.error("Failed to add publisher {} to policy {}: {}", publishmentId, policyId, opResult.message); + return opResult; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(opResult.message); + } + } + } else { + throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found"); + } + } + result.code = OpResult.SUCCESS; + result.message = "Successfully add " + publishmentIds.size() + " publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId; + LOG.info(result.message); + } catch (Exception ex) { + result.code = OpResult.FAILURE; + result.message = "Failed to add publishments: [" + StringUtils.join(publishmentIds,",") + "] to policy: " + policyId + ", cause: " + ex.getMessage(); + LOG.error(result.message,ex); + } + return result; + } + @Path("/policies/{policyId}") @GET public PolicyDefinition getPolicyByID(@PathParam("policyId") String policyId) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/eda6e586/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java index d245afd..06e96c7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.alert.metadata; +import com.google.common.base.Preconditions; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; @@ -23,9 +24,13 @@ import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.coordinator.*; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.HashMap; import java.util.List; +import java.util.Map; public interface IMetadataDao extends Closeable { @@ -63,6 +68,7 @@ public interface IMetadataDao extends Closeable { OpResult addPublishment(Publishment publishment); + OpResult removePublishment(String pubId); List<PublishmentType> listPublishmentType(); @@ -88,4 +94,17 @@ public interface IMetadataDao extends Closeable { OpResult importModels(Models models); + // ----------------------------------------------------------- + // Extended Metadata DAO Methods with default implementation + // ----------------------------------------------------------- + + Logger LOG = LoggerFactory.getLogger(IMetadataDao.class); + + default PolicyDefinition getPolicyByID(String policyId) { + Preconditions.checkNotNull(policyId,"policyId"); + return listPolicies().stream().filter(pc -> pc.getName().equals(policyId)).findAny().orElseGet(() -> { + LOG.error("Policy (policyId " + policyId + ") not found"); + throw new IllegalArgumentException("Policy (policyId " + policyId + ") not found"); + }); + } }