Repository: eagle
Updated Branches:
  refs/heads/master f20bf0eb3 -> ba79b5485


[EAGLE-1059] fix a bug in PolicyResource.java

https://issues.apache.org/jira/browse/EAGLE-1059

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #965 from qingwen220/EAGLE-1059.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ba79b548
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ba79b548
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ba79b548

Branch: refs/heads/master
Commit: ba79b5485622b9de92373d612631d1b96a245287
Parents: f20bf0e
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Mon Aug 7 10:56:55 2017 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Mon Aug 7 10:56:55 2017 +0800

----------------------------------------------------------------------
 .../impl/AlertBoltOutputCollectorWrapper.java   | 38 ++++++++------
 .../eagle/metadata/resource/PolicyResource.java | 24 +++++++++
 .../metadata/utils/PolicyIdConversions.java     | 20 ++++---
 .../dev/partials/alert/policyPrototypes.html    | 55 ++++++++++----------
 .../app/dev/public/js/ctrls/alertEditCtrl.js    | 37 +++++++------
 5 files changed, 103 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 606ddce..4c749f4 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -49,28 +49,32 @@ public class AlertBoltOutputCollectorWrapper implements 
AlertStreamCollector {
 
     @Override
     public void emit(AlertStreamEvent event) {
+        if (event == null) {
+            return;
+        }
+        event.ensureAlertId();
         Set<PublishPartition> clonedPublishPartitions = new 
HashSet<>(publishPartitions);
         for (PublishPartition publishPartition : clonedPublishPartitions) {
             // skip the publish partition which is not belong to this policy 
and also check streamId
             PublishPartition cloned = publishPartition.clone();
             Optional.ofNullable(event)
-                .filter(x -> x != null
-                    && x.getSchema() != null
-                    && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
-                    && 
(cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
-                    || 
cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
-                .ifPresent(x -> {
-                    cloned.getColumns().stream()
-                        .filter(y -> event.getSchema().getColumnIndex(y) >= 0
-                            && event.getSchema().getColumnIndex(y) < 
event.getSchema().getColumns().size())
-                        .map(y -> 
event.getData()[event.getSchema().getColumnIndex(y)])
-                        .filter(y -> y != null)
-                        .forEach(y -> cloned.getColumnValues().add(y));
-                    synchronized (outputLock) {
-                        streamContext.counter().incr("alert_count");
-                        delegate.emit(Arrays.asList(cloned, event));
-                    }
-                });
+                    .filter(x -> x != null
+                            && x.getSchema() != null
+                            && 
cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId())
+                            && 
(cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId())
+                            || 
cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT)))
+                    .ifPresent(x -> {
+                        cloned.getColumns().stream()
+                                .filter(y -> 
event.getSchema().getColumnIndex(y) >= 0
+                                        && event.getSchema().getColumnIndex(y) 
< event.getSchema().getColumns().size())
+                                .map(y -> 
event.getData()[event.getSchema().getColumnIndex(y)])
+                                .filter(y -> y != null)
+                                .forEach(y -> cloned.getColumnValues().add(y));
+                        synchronized (outputLock) {
+                            streamContext.counter().incr("alert_count");
+                            delegate.emit(Arrays.asList(cloned, event));
+                        }
+                    });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
index d09da4b..9edfc3e 100644
--- 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
+++ 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 import org.apache.eagle.common.rest.RESTResponse;
@@ -72,6 +73,7 @@ public class PolicyResource {
             Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), 
"alert publisher list should not be null");
 
             PolicyDefinition policyDefinition = policyEntity.getDefinition();
+            checkOutputStream(policyDefinition.getInputStreams(), 
policyDefinition.getOutputStreams());
             OpResult result = metadataResource.addPolicy(policyDefinition);
             if (result.code != 200) {
                 throw new IllegalArgumentException(result.message);
@@ -153,6 +155,7 @@ public class PolicyResource {
 
     private PolicyEntity importPolicyProto(PolicyEntity policyEntity) {
         PolicyDefinition policyDefinition = policyEntity.getDefinition();
+        checkOutputStream(policyDefinition.getInputStreams(), 
policyDefinition.getOutputStreams());
         List<String> inputStreamType = new ArrayList<>();
         String newDefinition = policyDefinition.getDefinition().getValue();
         for (String inputStream : policyDefinition.getInputStreams()) {
@@ -164,6 +167,7 @@ public class PolicyResource {
         policyDefinition.getDefinition().setValue(newDefinition);
         
policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(),
 policyDefinition.getName()));
         policyDefinition.setSiteId(null);
+        policyDefinition.getPartitionSpec().clear();
         policyEntity.setDefinition(policyDefinition);
         return policyEntityService.createOrUpdatePolicyProto(policyEntity);
     }
@@ -186,10 +190,19 @@ public class PolicyResource {
             policyDefinition.getDefinition().setValue(newDefinition);
             policyDefinition.setSiteId(site);
             
policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, 
policyProto.getDefinition().getName()));
+
             PolicyValidationResult validationResult = 
metadataResource.validatePolicy(policyDefinition);
             if (!validationResult.isSuccess() || 
validationResult.getException() != null) {
                 throw new 
IllegalArgumentException(validationResult.getException());
             }
+
+            policyDefinition.getPartitionSpec().clear();
+            for (StreamPartition sd : 
validationResult.getPolicyExecutionPlan().getStreamPartitions()) {
+                if (inputStreams.contains(sd.getStreamId())) {
+                    policyDefinition.getPartitionSpec().add(sd);
+                }
+            }
+
             OpResult result = metadataResource.addPolicy(policyDefinition);
             if (result.code != 200) {
                 throw new IllegalArgumentException("fail to create policy: " + 
result.message);
@@ -204,4 +217,15 @@ public class PolicyResource {
         return true;
     }
 
+    private void checkOutputStream(List<String> inputStreams, List<String> 
outputStreams) {
+        for (String inputStream : inputStreams) {
+            for (String outputStream : outputStreams) {
+                if (outputStream.contains(inputStream)) {
+                    throw new IllegalArgumentException("OutputStream name 
should not contains string: " + inputStream
+                            + ". Please rename your OutputStream name");
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
index c9ccadc..2012a40 100644
--- 
a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
+++ 
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
@@ -22,17 +22,21 @@ import com.google.common.base.Preconditions;
 public class PolicyIdConversions {
 
     public static String generateUniquePolicyId(String siteId, String 
policyName) {
-        return String.format("%s_%s", policyName, siteId);
+        String subffix = String.format("_%s", siteId.toLowerCase());
+        if (policyName.toLowerCase().endsWith(subffix)) {
+            return policyName;
+        }
+        return String.format("%s_%s", policyName, siteId.toLowerCase());
     }
 
-    public static String parsePolicyId(String siteId, String 
generatedUniquePolicyId) {
-        String subffix = String.format("_%s", siteId);
-        if (generatedUniquePolicyId.endsWith(subffix)) {
-            int streamTypeIdLength = generatedUniquePolicyId.length() - 
subffix.length();
-            Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid 
policyId: " + generatedUniquePolicyId + ", policyId is empty");
-            return generatedUniquePolicyId.substring(0, streamTypeIdLength);
+    public static String parsePolicyId(String siteId, String policyName) {
+        String subffix = String.format("_%s", siteId.toLowerCase());
+        if (policyName.toLowerCase().endsWith(subffix)) {
+            int streamTypeIdLength = policyName.length() - subffix.length();
+            Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid 
policyId: " + policyName + ", policyId is empty");
+            return policyName.substring(0, streamTypeIdLength);
         } else {
-            return generatedUniquePolicyId;
+            return policyName;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
----------------------------------------------------------------------
diff --git 
a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html 
b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
index 9f22ce4..0798475 100644
--- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
+++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html
@@ -27,35 +27,36 @@
                <div sort-table="prototypeList">
                        <table class="table table-bordered table-hover">
                                <thead>
-                                       <tr>
-                                               <th>
-                                                       <input type="checkbox" 
ng-checked="getCheckedList().length === prototypeList.length && 
prototypeList.length !== 0" ng-click="doCheckAll()" />
-                                               </th>
-                                               <th>Name</th>
-                                               <th>Definition</th>
-                                               <th>Publishers</th>
-                                               <th width="110">Operation</th>
-                                       </tr>
+                               <tr>
+                                       <th>
+                                               <input type="checkbox" 
ng-checked="getCheckedList().length === prototypeList.length && 
prototypeList.length !== 0" ng-click="doCheckAll()" />
+                                       </th>
+                                       <th>Name</th>
+                                       <th>Definition</th>
+                                       <!--<th>Publishers</th> -->
+                                       <th width="110">Operation</th>
+                               </tr>
                                </thead>
                                <tbody>
-                                       <tr>
-                                               <td>
-                                                       <input type="checkbox" 
ng-checked="checkedPrototypes[item.name]" 
ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
-                                               </td>
-                                               <td>{{item.name}}</td>
-                                               
<td><pre>{{item.definition.definition.value}}</pre></td>
-                                               <td>
-                                                       <ul class="no-margin">
-                                                               <li 
ng-repeat="publisher in item.alertPublishmentIds track by $index">
-                                                                       
{{publisher}}
-                                                               </li>
-                                                       </ul>
-                                               </td>
-                                               <td class="text-center">
-                                                       <button class="btn 
btn-xs btn-primary" ng-click="createPolicy([item])">Export</button>
-                                                       <button class="btn 
btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button>
-                                               </td>
-                                       </tr>
+                               <tr>
+                                       <td>
+                                               <input type="checkbox" 
ng-checked="checkedPrototypes[item.name]" 
ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" />
+                                       </td>
+                                       <td>{{item.name}}</td>
+                                       
<td><pre>{{item.definition.definition.value}}</pre></td>
+                                       <!--
+                                       <td>
+                                               <ul class="no-margin">
+                                                       <li 
ng-repeat="publisher in item.alertPublishmentIds track by $index">
+                                                               {{publisher}}
+                                                       </li>
+                                               </ul>
+                                       </td> -->
+                                       <td class="text-center">
+                                               <button class="btn btn-xs 
btn-primary" ng-click="createPolicy([item])">Export</button>
+                                               <button class="btn btn-xs 
btn-danger" ng-click="deletePrototype(item)">Delete</button>
+                                       </td>
+                               </tr>
                                </tbody>
                        </table>
                </div>

http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
----------------------------------------------------------------------
diff --git 
a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js 
b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
index e4623b2..fc2b494 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
@@ -253,12 +253,12 @@
                };
 
                /*$scope.checkInputStream = function (streamId) {
-                       if($scope.isInputStreamSelected(streamId)) {
-                               $scope.policy.inputStreams = 
common.array.remove(streamId, $scope.policy.inputStreams);
-                       } else {
-                               $scope.policy.inputStreams.push(streamId);
-                       }
-               };*/
+                if($scope.isInputStreamSelected(streamId)) {
+                $scope.policy.inputStreams = common.array.remove(streamId, 
$scope.policy.inputStreams);
+                } else {
+                $scope.policy.inputStreams.push(streamId);
+                }
+                };*/
 
                // 
==============================================================
                // =                         Definition                         
=
@@ -558,7 +558,18 @@
                                                });
                                        }
 
-                                       policyPromise._then(function () {
+                                       policyPromise._then(function (res) {
+                                               var validate = res.data;
+                                               if (!validate.success) {
+                                                       $.dialog({
+                                                               title: "OPS",
+                                                               content: 
"Create policy failed: " + (res.data.message || res.data.errors)
+                                                       });
+                                                       $scope.policyLock = 
false;
+                                                       $scope.saveLock = false;
+                                                       return;
+                                               }
+
                                                console.log("Create policy 
success...");
                                                $.dialog({
                                                        title: "Done",
@@ -566,18 +577,6 @@
                                                }, function () {
                                                        
$wrapState.go("policyDetail", {name: $scope.policy.name, siteId: 
$scope.policy.siteId});
                                                });
-                                       }, function (res) {
-                                               var errormsg = "";
-                                               if(typeof res.data.message !== 
'undefined') {
-                                                       errormsg = 
res.data.message;
-                                               } else {
-                                                       errormsg = 
res.data.errors;
-                                               }
-                                               $.dialog({
-                                                       title: "OPS",
-                                                       content: "Create policy 
failed: " + errormsg
-                                               });
-                                               $scope.policyLock = false;
                                        });
                                }, function (args) {
                                        $.dialog({

Reply via email to