Repository: falcon Updated Branches: refs/heads/master 3f5087997 -> 49fa46e29
FALCON-2191 Extension Job store changes and rest api changes for submit and submitAndScheule extension services. Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #307 from sandeepSamudrala/FALCON-2191 and squashes the following commits: 2928bce [sandeep] FALCON-2191. Rebasing the patch 2c14f41 [sandeep] FALCON-2191 Incorporated review comments. modified validate method to check for the entity being process and feed only 5da9141 [sandeep] FALCON-2191 Incorporated review comments ca45a2e [sandeep] FALCON-2191 Extension Job store changes and rest api changes for submit and submitAndScheule extension services 9487132 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2191 fd2357b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190 8aacd75 [sandeep] FALCON-2183 Incorporated review comments f3d7268 [sandeep] FALCON-2183 Incorporated review comments 11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49fa46e2 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49fa46e2 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49fa46e2 Branch: refs/heads/master Commit: 49fa46e29540a869c3a669f779f662f5543b8d10 Parents: 3f50879 Author: sandeep <[email protected]> Authored: Fri Nov 25 17:09:32 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Nov 25 17:09:32 2016 +0530 ---------------------------------------------------------------------- .../falcon/persistence/ExtensionBean.java | 117 +++++++++++++++ .../falcon/persistence/ExtensionJobsBean.java | 145 +++++++++++++++++++ .../persistence/ExtensionMetadataBean.java | 113 --------------- .../persistence/PersistenceConstants.java | 9 ++ .../falcon/tools/FalconStateStoreDBCLI.java | 3 +- .../src/main/resources/META-INF/persistence.xml | 16 +- .../extensions/jdbc/ExtensionMetaStore.java | 89 +++++++++--- .../falcon/extensions/store/ExtensionStore.java | 47 +++--- .../extensions/jdbc/ExtensionMetaStoreTest.java | 39 +++-- .../extensions/store/ExtensionStoreTest.java | 33 +++-- pom.xml | 6 + prism/pom.xml | 11 +- .../resource/extensions/ExtensionManager.java | 115 ++++++++++----- src/build/findbugs-exclude.xml | 7 +- 14 files changed, 519 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java new file mode 100644 index 0000000..2cade5b --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + +import org.apache.falcon.extensions.ExtensionType; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.util.Date; + + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Table to store extensions. + */ + +@Table(name = "EXTENSIONS") +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "), + @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "), + @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "), + @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName") +}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class ExtensionBean { + @Basic + @NotNull + @Id + @Column(name = "extension_name") + private String extensionName; + + @Basic + @NotNull + @Column(name = "extension_type") + @Enumerated(EnumType.STRING) + private ExtensionType extensionType; + + @Basic + @Column(name = "description") + private String description; + + @Basic + @NotNull + @Column(name = "location") + private String location; + + @Basic + @NotNull + @Column(name = "creation_time") + private Date creationTime; + + public ExtensionType getExtensionType() { + return extensionType; + } + + public void setExtensionType(ExtensionType extensionType) { + this.extensionType = extensionType; + } + + public Date getCreationTime() { + return creationTime; + } + + public void setCreationTime(Date creationTime) { + this.creationTime = creationTime; + } + + + public String getExtensionName() { + return extensionName; + } + + public void setExtensionName(String extensionName) { + this.extensionName = extensionName; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java new file mode 100644 index 0000000..2dc66f8 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +/** + * Table to store extension jobs. + */ + +@Table(name = "EXTENSION_JOBS") +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "), + @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "), + @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName") +}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class ExtensionJobsBean { + + @Basic + @NotNull + @Id + @Column(name = "job_name") + private String jobName; + + @Basic + @NotNull + @Column(name = "extension_name") + private String extensionName; + + @Basic + @NotNull + @Column(name = "feeds") + private String[] feeds; + + @Basic + @NotNull + @Column(name = "processes") + private String[] processes; + + @Lob + @Basic(fetch= FetchType.LAZY) + @Column(name = "config") + private byte[] config; + + + @Basic + @NotNull + @Column(name = "creation_time") + private Date creationTime; + + @Basic + @NotNull + @Column(name = "last_updated_time") + private Date lastUpdatedTime; + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public Date getCreationTime() { + return creationTime; + } + + public void setCreationTime(Date creationTime) { + this.creationTime = creationTime; + } + + + public byte[] getConfig() { + return config; + } + + public void setConfig(byte[] config) { + this.config = config; + } + + public String getExtensionName() { + return extensionName; + } + + public void setExtensionName(String extensionName) { + this.extensionName = extensionName; + } + + public Date getLastUpdatedTime() { + return lastUpdatedTime; + } + + public void setLastUpdatedTime(Date lastUpdatedTime) { + this.lastUpdatedTime = lastUpdatedTime; + } + + public List<String> getFeeds() { + return Arrays.asList(feeds); + } + + public void setFeeds(List<String> feeds) { + this.feeds = feeds.toArray(new String[feeds.size()]); + } + + public List<String> getProcesses() { + return Arrays.asList(processes); + } + + public void setProcesses(List<String> processes) { + this.processes = processes.toArray(new String[processes.size()]); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java deleted file mode 100644 index 9f4cf72..0000000 --- a/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.persistence; - -import javax.persistence.Basic; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.Table; -import javax.validation.constraints.NotNull; -import java.util.Date; - - -//SUSPEND CHECKSTYLE CHECK LineLengthCheck -/** - * Table to store extension metadata. - */ - -@Table(name = "EXTENSION_METADATA") -@Entity -@NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionMetadataBean a "), - @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionMetadataBean a where a.extensionType = :extensionType "), - @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionMetadataBean a where a.extensionName = :extensionName "), - @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionMetadataBean a where a.extensionName = :extensionName") -}) -//RESUME CHECKSTYLE CHECK LineLengthCheck -public class ExtensionMetadataBean { - @Basic - @NotNull - @Id - @Column(name = "extension_name") - private String extensionName; - - - @Basic - @NotNull - @Column(name = "extension_type") - private String extensionType; - - @Basic - @Column(name = "description") - private String description; - - @Basic - @NotNull - @Column(name = "location") - private String location; - - - @Basic - @NotNull - @Column(name = "creation_time") - private Date creationTime; - - public String getExtensionType() { - return extensionType; - } - - public void setExtensionType(String extensionType) { - this.extensionType = extensionType; - } - - public Date getCreationTime() { - return creationTime; - } - - public void setCreationTime(Date creationTime) { - this.creationTime = creationTime; - } - - - public String getExtensionName() { - return extensionName; - } - - public void setExtensionName(String extensionName) { - this.extensionName = extensionName; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 94eb32e..fc82ae7 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -36,6 +36,7 @@ public final class PersistenceConstants { public static final String DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY = "DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; + public static final String GET_ENTITY = "GET_ENTITY"; public static final String GET_ENTITY_FOR_STATE = "GET_ENTITY_FOR_STATE"; public static final String UPDATE_ENTITY = "UPDATE_ENTITY"; @@ -57,18 +58,26 @@ public final class PersistenceConstants { public static final String GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER"; public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE"; public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; + public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; + public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE"; public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE"; public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES"; public static final String DELETE_ALL_BACKLOG_ENTITY_INSTANCES ="DELETE_ALL_BACKLOG_ENTITY_INSTANCES"; + public static final String GET_ALL_EXTENSIONS = "GET_ALL_EXTENSIONS"; public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE"; public static final String DELETE_EXTENSION = "DELETE_EXTENSION"; public static final String GET_EXTENSION = "GET_EXTENSION"; + + public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS"; + public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB"; + public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB"; + } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index e12b982..0c04da3 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -247,7 +247,8 @@ public class FalconStateStoreDBCLI { args.add("org.apache.falcon.persistence.MonitoredEntityBean"); args.add("org.apache.falcon.persistence.EntitySLAAlertBean"); args.add("org.apache.falcon.persistence.BacklogMetricBean"); - args.add("org.apache.falcon.persistence.ExtensionMetadataBean"); + args.add("org.apache.falcon.persistence.ExtensionBean"); + args.add("org.apache.falcon.persistence.ExtensionJobsBean"); return args.toArray(new String[args.size()]); } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index 1fbcc9d..0f20103 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -29,7 +29,8 @@ <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <class>org.apache.falcon.persistence.BacklogMetricBean</class> - <class>org.apache.falcon.persistence.ExtensionMetadataBean</class> + <class>org.apache.falcon.persistence.ExtensionBean</class> + <class>org.apache.falcon.persistence.ExtensionJobsBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -40,7 +41,7 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.ExtensionMetadataBean)"></property> + org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -64,7 +65,8 @@ <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <class>org.apache.falcon.persistence.BacklogMetricBean</class> - <class>org.apache.falcon.persistence.ExtensionMetadataBean</class> + <class>org.apache.falcon.persistence.ExtensionBean</class> + <class>org.apache.falcon.persistence.ExtensionJobsBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -74,7 +76,7 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.ExtensionMetadataBean)"></property> + org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -97,7 +99,8 @@ <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <class>org.apache.falcon.persistence.BacklogMetricBean</class> - <class>org.apache.falcon.persistence.ExtensionMetadataBean</class> + <class>org.apache.falcon.persistence.ExtensionBean</class> + <class>org.apache.falcon.persistence.ExtensionJobsBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -107,7 +110,8 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionMetadataBean)"/> + org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionBean; + org.apache.falcon.persistence.ExtensionJobsBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> <property name="openjpa.ReadLockLevel" value="read"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java index 0a1a0e7..5501146 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java @@ -18,7 +18,8 @@ package org.apache.falcon.extensions.jdbc; import org.apache.falcon.extensions.ExtensionType; -import org.apache.falcon.persistence.ExtensionMetadataBean; +import org.apache.falcon.persistence.ExtensionBean; +import org.apache.falcon.persistence.ExtensionJobsBean; import org.apache.falcon.persistence.PersistenceConstants; import org.apache.falcon.service.FalconJPAService; @@ -32,39 +33,43 @@ import java.util.List; */ public class ExtensionMetaStore { + private static final String EXTENSION_NAME = "extensionName"; + private static final String JOB_NAME = "jobName"; + private static final String EXTENSION_TYPE = "extensionType"; + private EntityManager getEntityManager() { return FalconJPAService.get().getEntityManager(); } - public void storeExtensionMetadataBean(String extensionName, String location, ExtensionType extensionType, - String description){ - ExtensionMetadataBean extensionMetadataBean = new ExtensionMetadataBean(); - extensionMetadataBean.setLocation(location); - extensionMetadataBean.setExtensionName(extensionName); - extensionMetadataBean.setExtensionType(extensionType.toString()); - extensionMetadataBean.setCreationTime(new Date(System.currentTimeMillis())); - extensionMetadataBean.setDescription(description); + public void storeExtensionBean(String extensionName, String location, ExtensionType extensionType, + String description){ + ExtensionBean extensionBean = new ExtensionBean(); + extensionBean.setLocation(location); + extensionBean.setExtensionName(extensionName); + extensionBean.setExtensionType(extensionType); + extensionBean.setCreationTime(new Date(System.currentTimeMillis())); + extensionBean.setDescription(description); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); - entityManager.persist(extensionMetadataBean); + entityManager.persist(extensionBean); } finally { commitAndCloseTransaction(entityManager); } } - public Boolean checkIfExtensionExists(String extensionName){ + public Boolean checkIfExtensionExists(String extensionName) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION); - q.setParameter("extensionName", extensionName); + q.setParameter(EXTENSION_NAME, extensionName); if (q.getResultList().size() > 0){ return true; } return false; } - public List<ExtensionMetadataBean> getAllExtensions(){ + public List<ExtensionBean> getAllExtensions() { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS); @@ -75,11 +80,11 @@ public class ExtensionMetaStore { } } - public void deleteExtensionsOfType(ExtensionType extensionType){ + public void deleteExtensionsOfType(ExtensionType extensionType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE); - q.setParameter("extensionType", extensionType.toString()); + q.setParameter(EXTENSION_TYPE, extensionType); try{ q.executeUpdate(); } finally { @@ -87,23 +92,23 @@ public class ExtensionMetaStore { } } - public ExtensionMetadataBean getDetail(String extensionName){ + public ExtensionBean getDetail(String extensionName) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION); - q.setParameter("extensionName", extensionName); + q.setParameter(EXTENSION_NAME, extensionName); try { - return (ExtensionMetadataBean)q.getSingleResult(); + return (ExtensionBean)q.getSingleResult(); } finally { commitAndCloseTransaction(entityManager); } } - public void deleteExtensionMetadata(String extensionName){ + public void deleteExtension(String extensionName){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION); - q.setParameter("extensionName", extensionName); + q.setParameter(EXTENSION_NAME, extensionName); try{ q.executeUpdate(); } finally { @@ -111,6 +116,50 @@ public class ExtensionMetaStore { } } + public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes, + byte[] config) { + ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean(); + Date currentTime = new Date(System.currentTimeMillis()); + extensionJobsBean.setJobName(jobName); + extensionJobsBean.setExtensionName(extensionName); + extensionJobsBean.setCreationTime(currentTime); + extensionJobsBean.setFeeds(feeds); + extensionJobsBean.setProcesses(processes); + extensionJobsBean.setConfig(config); + extensionJobsBean.setLastUpdatedTime(currentTime); + EntityManager entityManager = getEntityManager(); + try { + beginTransaction(entityManager); + entityManager.persist(extensionJobsBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void deleteExtensionJob(String jobName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query query = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION_JOB); + query.setParameter(JOB_NAME, jobName); + try{ + query.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public List<ExtensionJobsBean> getAllExtensionJobs() { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS); + try { + return q.getResultList(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + private void beginTransaction(EntityManager entityManager) { entityManager.getTransaction().begin(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java index e15919f..832d5b7 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -20,38 +20,35 @@ package org.apache.falcon.extensions.store; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.extensions.AbstractExtension; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; - -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.entity.store.StoreAccessException; - import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; - -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Store for Falcon extensions. @@ -106,10 +103,10 @@ public final class ExtensionStore { String description = getShortDescription(extension); String recipeName = extension; String location = storePath.toString() + '/' + extension; - metaStore.storeExtensionMetadataBean(recipeName, location, extensionType, description); + metaStore.storeExtensionBean(recipeName, location, extensionType, description); } } catch (FalconException e){ - LOG.error("Exception in ExtensionStore:", e); + LOG.error("Exception in ExtensionMetaStore:", e); throw new RuntimeException(e); } @@ -239,36 +236,36 @@ public final class ExtensionStore { } public List<String> getExtensions() throws StoreAccessException { - List<String> extesnionList = new ArrayList<>(); + List<String> extensionList = new ArrayList<>(); try { FileStatus[] fileStatuses = fs.listStatus(storePath); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); - extesnionList.add(filePath.getName()); + extensionList.add(filePath.getName()); } } } catch (IOException e) { throw new StoreAccessException(e); } - return extesnionList; + return extensionList; } - public String deleteExtensionMetadata(final String extensionName) throws ValidationException{ + public String deleteExtension(final String extensionName) throws ValidationException{ ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED : ExtensionType.CUSTOM; if (extensionType.equals(ExtensionType.TRUSTED)){ throw new ValidationException(extensionName + " is trusted cannot be deleted."); } if (metaStore.checkIfExtensionExists(extensionName)) { - metaStore.deleteExtensionMetadata(extensionName); + metaStore.deleteExtension(extensionName); return "Deleted extension:" + extensionName; }else { return "Extension:" + extensionName + " is not registered with Falcon."; } } - public String registerExtensionMetadata(final String extensionName, final String path, final String description) + public String registerExtension(final String extensionName, final String path, final String description) throws URISyntaxException, FalconException { Configuration conf = new Configuration(); URI uri = new URI(path); @@ -277,7 +274,7 @@ public final class ExtensionStore { try { fileSystem.listStatus(new Path(uri.getPath() + "/README")); } catch (IOException e){ - LOG.error("Exception in registerExtensionMetadata:", e); + LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("README file is not present in the " + path); } PathFilter filter=new PathFilter(){ @@ -292,7 +289,7 @@ public final class ExtensionStore { throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); } } catch (IOException e){ - LOG.error("Exception in registerExtensionMetadata:", e); + LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build."); } FileStatus[] propStatus; @@ -303,13 +300,13 @@ public final class ExtensionStore { + " structure."); } } catch (IOException e){ - LOG.error("Exception in registerExtensionMetadata:", e); + LOG.error("Exception in registering Extension:{}", extensionName, e); throw new ValidationException("Directory is not present in the " + uri.getPath() + "/META" + " structure."); } if (!metaStore.checkIfExtensionExists(extensionName)){ - metaStore.storeExtensionMetadataBean(extensionName, path, ExtensionType.CUSTOM, description); + metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description); }else{ throw new ValidationException(extensionName + " already exsists."); } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java index d0f1c0c..d96fc1f 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java @@ -20,7 +20,7 @@ package org.apache.falcon.extensions.jdbc; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.store.AbstractTestExtensionStore; -import org.apache.falcon.persistence.ExtensionMetadataBean; +import org.apache.falcon.persistence.ExtensionBean; import org.apache.falcon.service.FalconJPAService; import org.apache.hadoop.conf.Configuration; @@ -29,9 +29,10 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import javax.persistence.EntityManager; import javax.persistence.Query; +import java.util.ArrayList; +import java.util.List; /** * Test Cases for ExtensionMetaStore. @@ -57,28 +58,46 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore { } @Test - public void dbOpertaions(){ + public void testExtension(){ //insert - stateStore.storeExtensionMetadataBean("test1", "test_location", ExtensionType.TRUSTED, "test_description"); + stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description"); Assert.assertEquals(stateStore.getAllExtensions().size(), 1); //check data - ExtensionMetadataBean bean = stateStore.getDetail("test1"); + ExtensionBean bean = stateStore.getDetail("test1"); Assert.assertEquals(bean.getLocation(), "test_location"); //delete stateStore.deleteExtensionsOfType(ExtensionType.TRUSTED); Assert.assertEquals(stateStore.getAllExtensions().size(), 0); } + @Test + public void testExtensionJob() { + stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description"); + List<String> processes = new ArrayList<>(); + processes.add("testProcess"); + List<String> feeds = new ArrayList<>(); + feeds.add("testFeed"); + + byte[] config = new byte[0]; + stateStore.storeExtensionJob("job1", "test2", feeds, processes, config); + + Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1); + stateStore.deleteExtensionJob("job1"); + Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 0); + } + private void clear() { - EntityManager em = FalconJPAService.get().getEntityManager(); - em.getTransaction().begin(); + EntityManager entityManager = FalconJPAService.get().getEntityManager(); + entityManager.getTransaction().begin(); try { - Query query = em.createNativeQuery("delete from EXTENSION_METADATA"); + Query query = entityManager.createNativeQuery("delete from EXTENSIONS"); + query.executeUpdate(); + query = entityManager.createNativeQuery("delete from EXTENSION_JOBS"); query.executeUpdate(); } finally { - em.getTransaction().commit(); - em.close(); + entityManager.getTransaction().commit(); + entityManager.close(); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 27bea53..50c9b7f 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -19,14 +19,6 @@ package org.apache.falcon.extensions.store; import com.google.common.collect.ImmutableMap; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.net.URISyntaxException; -import java.util.Map; -import javax.persistence.EntityManager; -import javax.persistence.Query; import org.apache.falcon.FalconException; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; @@ -42,6 +34,15 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.URISyntaxException; +import java.util.Map; + /** * Tests for extension store. */ @@ -105,21 +106,21 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { @Test - public void testRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{ - createlibs(); + public void testRegisterExtension() throws IOException, URISyntaxException, FalconException{ + createLibs(); createReadmeAndJar(); createMETA(); store = ExtensionStore.get(); - store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc"); ExtensionMetaStore metaStore = new ExtensionMetaStore(); Assert.assertEquals(metaStore.getAllExtensions().size(), 1); } @Test(expectedExceptions=ValidationException.class) - public void testFailureCaseRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{ + public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException{ store = ExtensionStore.get(); - createlibs(); - store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc"); + createLibs(); + store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc"); } private void createMETA() throws IOException{ @@ -140,7 +141,7 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { br.close(); } - private void createlibs() throws IOException{ + private void createLibs() throws IOException{ Path path = new Path(EXTENSION_PATH); if (fs.exists(path)){ fs.delete(path, true); @@ -169,7 +170,7 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { EntityManager em = FalconJPAService.get().getEntityManager(); em.getTransaction().begin(); try { - Query query = em.createNativeQuery("delete from EXTENSION_METADATA"); + Query query = em.createNativeQuery("delete from EXTENSIONS"); query.executeUpdate(); } finally { em.getTransaction().commit(); http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6a15987..95a101b 100644 --- a/pom.xml +++ b/pom.xml @@ -699,6 +699,12 @@ </dependency> <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${jersey.version}</version> + </dependency> + + <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/prism/pom.xml ---------------------------------------------------------------------- diff --git a/prism/pom.xml b/prism/pom.xml index 11f3944..57cdfe7 100644 --- a/prism/pom.xml +++ b/prism/pom.xml @@ -88,12 +88,6 @@ <dependency> <groupId>org.apache.falcon</groupId> - <artifactId>falcon-common-types</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.falcon</groupId> <artifactId>falcon-oozie-adaptor</artifactId> </dependency> @@ -130,6 +124,11 @@ </dependency> <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + </dependency> + + <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 6f2974d..cd1d4e2 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -18,6 +18,8 @@ package org.apache.falcon.resource.extensions; +import com.sun.jersey.multipart.FormDataParam; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; @@ -25,12 +27,14 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.store.StoreAccessException; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.extensions.Extension; import org.apache.falcon.extensions.ExtensionProperties; import org.apache.falcon.extensions.ExtensionService; +import org.apache.falcon.extensions.ExtensionType; import org.apache.falcon.extensions.jdbc.ExtensionMetaStore; import org.apache.falcon.extensions.store.ExtensionStore; -import org.apache.falcon.persistence.ExtensionMetadataBean; +import org.apache.falcon.persistence.ExtensionBean; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; @@ -58,6 +62,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -76,7 +81,6 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name="; public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job="; - public static final String TAG_SEPARATOR = ","; public static final String ASCENDING_SORT_ORDER = "asc"; public static final String DESCENDING_SORT_ORDER = "desc"; @@ -290,18 +294,19 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @POST @Path("submit/{extension-name}") - @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult submit( @PathParam("extension-name") String extensionName, @Context HttpServletRequest request, - @DefaultValue("") @QueryParam("doAs") String doAsUser) { + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("jobName") String jobName, + @FormDataParam("entities") List<Entity> entities, + @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); try { - List<Entity> entities = generateEntities(extensionName, request); - for (Entity entity : entities) { - submitInternal(entity, doAsUser); - } + entities = getEntityList(extensionName, entities, config); + submitEntities(extensionName, doAsUser, jobName, entities, config); } catch (FalconException | IOException e) { LOG.error("Error when submitting extension job: ", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -309,20 +314,37 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"); } + private void validateEntities(List<Entity> entities) throws FalconException { + for (Entity entity : entities) { + if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.FEED.equals(entity.getEntityType())) { + LOG.error("Cluster entity is not allowed for submission via submitEntities: {}", entity.getName()); + throw new FalconException("Cluster entity is not allowed for submission in extensions submission"); + } + super.validate(entity); + } + } + + private ExtensionType getExtensionType(String extensionName) { + ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + ExtensionBean extensionDetails = metaStore.getDetail(extensionName); + return extensionDetails.getExtensionType(); + } + @POST @Path("submitAndSchedule/{extension-name}") - @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) + @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA}) @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON}) public APIResult submitAndSchedule( @PathParam("extension-name") String extensionName, @Context HttpServletRequest request, - @DefaultValue("") @QueryParam("doAs") String doAsUser) { + @DefaultValue("") @QueryParam("doAs") String doAsUser, + @QueryParam("jobName") String jobName, + @FormDataParam("entities") List<Entity> entities, + @FormDataParam("config") InputStream config) { checkIfExtensionServiceIsEnabled(); try { - List<Entity> entities = generateEntities(extensionName, request); - for (Entity entity : entities) { - submitInternal(entity, doAsUser); - } + entities = getEntityList(extensionName, entities, config); + submitEntities(extensionName, doAsUser, jobName, entities, config); for (Entity entity : entities) { scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null); } @@ -333,6 +355,33 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully"); } + private void submitEntities(String extensionName, String doAsUser, String jobName, List<Entity> entities, + InputStream configStream) throws FalconException, IOException { + validateEntities(entities); + List<String> feeds = new ArrayList<>(); + List<String> processes = new ArrayList<>(); + for (Entity entity : entities) { + submitInternal(entity, doAsUser); + if (EntityType.FEED.equals(entity.getEntityType())) { + feeds.add(entity.getName()); + } else if (EntityType.PROCESS.equals(entity.getEntityType())) { + processes.add(entity.getName()); + } + } + ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + byte[] configBytes = IOUtils.toByteArray(configStream); + metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, configBytes); + } + + private List<Entity> getEntityList(String extensionName, List<Entity> entities, InputStream config) + throws FalconException, IOException { + ExtensionType extensionType = getExtensionType(extensionName); + if (ExtensionType.TRUSTED.equals(extensionType)) { + entities = generateEntities(extensionName, config); + } + return entities; + } + @POST @Path("update/{extension-name}") @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN}) @@ -343,7 +392,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); try { - List<Entity> entities = generateEntities(extensionName, request); + List<Entity> entities = generateEntities(extensionName, request.getInputStream()); for (Entity entity : entities) { super.update(entity, entity.getEntityType().name(), entity.getName(), null); } @@ -364,7 +413,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @DefaultValue("") @QueryParam("doAs") String doAsUser) { checkIfExtensionServiceIsEnabled(); try { - List<Entity> entities = generateEntities(extensionName, request); + List<Entity> entities = generateEntities(extensionName, request.getInputStream()); for (Entity entity : entities) { super.validate(entity); } @@ -385,8 +434,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { JSONArray results; try { - List<String> extensions = ExtensionStore.get().getExtensions(); - results = buildEnumerateResult(extensions); + results = buildEnumerateResult(); } catch (StoreAccessException e) { LOG.error("Failed when accessing extension store.", e); throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -442,7 +490,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return ExtensionStore.get().deleteExtensionMetadata(extensionName); + return ExtensionStore.get().deleteExtension(extensionName); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -454,11 +502,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { @Produces(MediaType.TEXT_PLAIN) public String registerExtensionMetadata( @PathParam("extension-name") String extensionName, - @QueryParam("path") String path, @QueryParam("description") String description){ + @QueryParam("path") String path, + @QueryParam("description") String description){ checkIfExtensionServiceIsEnabled(); validateExtensionName(extensionName); try { - return ExtensionStore.get().registerExtensionMetadata(extensionName, path, description); + return ExtensionStore.get().registerExtension(extensionName, path, description); } catch (Throwable e) { throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -486,18 +535,18 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { } } - private static JSONArray buildEnumerateResult(final List<String> extensions) throws FalconException { + private static JSONArray buildEnumerateResult() throws FalconException { JSONArray results = new JSONArray(); - ExtensionMetaStore metricStore = ExtensionStore.get().getMetaStore(); - List<ExtensionMetadataBean> beanList = metricStore.getAllExtensions(); - for (ExtensionMetadataBean bean : beanList) { + ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore(); + List<ExtensionBean> extensionBeanList = metaStore.getAllExtensions(); + for (ExtensionBean extensionBean : extensionBeanList) { JSONObject resultObject = new JSONObject(); try { - resultObject.put(EXTENSION_NAME, bean.getExtensionName().toLowerCase()); - resultObject.put(EXTENSION_TYPE, bean.getExtensionType()); - resultObject.put(EXTENSION_DESC, bean.getDescription()); - resultObject.put(EXTENSION_LOCATION, bean.getLocation()); + resultObject.put(EXTENSION_NAME, extensionBean.getExtensionName().toLowerCase()); + resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType()); + resultObject.put(EXTENSION_DESC, extensionBean.getDescription()); + resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation()); } catch (JSONException e) { throw new FalconException(e); } @@ -507,12 +556,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { return results; } - private List<Entity> generateEntities(String extensionName, HttpServletRequest request) + private List<Entity> generateEntities(String extensionName, InputStream configStream) throws FalconException, IOException { // get entities for extension job Properties properties = new Properties(); - properties.load(request.getInputStream()); - List<Entity> entities = extension.getEntities(extensionName, request.getInputStream()); + properties.load(configStream); + List<Entity> entities = extension.getEntities(extensionName, configStream); // add tags on extension name and job String jobName = properties.getProperty(ExtensionProperties.JOB_NAME.getName()); @@ -528,7 +577,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { throw new ValidationException("No extension resources found for " + extensionName); } - ExtensionMetadataBean bean = metaStore.getDetail(extensionName); + ExtensionBean bean = metaStore.getDetail(extensionName); JSONObject resultObject = new JSONObject(); try { resultObject.put(EXTENSION_NAME, bean.getExtensionName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/src/build/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index d910c96..04e267f 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -70,7 +70,12 @@ </Match> <Match> - <Class name="org.apache.falcon.persistence.ExtensionMetadataBean" /> + <Class name="org.apache.falcon.persistence.ExtensionBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> + </Match> + + <Match> + <Class name="org.apache.falcon.persistence.ExtensionJobsBean" /> <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> </Match>
