Author: virag
Date: Thu Jan 10 22:34:52 2013
New Revision: 1431711
URL: http://svn.apache.org/viewvc?rev=1431711&view=rev
Log:
OOZIE-1161 Remove unnecessary db updates for some of the blobs like
missing_dependencies' of Coordinator Action(virag)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForModifiedTimeJPAExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/release-log.txt
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1431711&r1=1431710&r2=1431711&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Thu Jan 10 22:34:52 2013
@@ -61,6 +61,8 @@ import org.apache.openjpa.persistence.jd
// Update query for Start
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update
CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp =
:lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending
= :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where
w.id = :id"),
+ @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query =
"update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime
where w.id = :id"),
+
@NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query =
"delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status =
'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
@NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from
CoordinatorActionBean a where a.id = :id and (a.status = 'WAITING' OR a.status
= 'READY')"),
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1431711&r1=1431710&r2=1431711&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Thu Jan 10 22:34:52 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.coord.CoordELEva
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import
org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
@@ -113,6 +114,7 @@ public class CoordActionInputCheckXComma
StringBuilder actionXml = new
StringBuilder(coordAction.getActionXml());
Instrumentation.Cron cron = new Instrumentation.Cron();
+ boolean isChangeInDependency = false;
try {
Configuration actionConf = new XConfiguration(new
StringReader(coordAction.getRunConf()));
cron.start();
@@ -120,7 +122,8 @@ public class CoordActionInputCheckXComma
StringBuilder nonExistList = new StringBuilder();
StringBuilder nonResolvedList = new StringBuilder();
String firstMissingDependency = "";
-
CoordCommandUtils.getResolvedList(coordAction.getMissingDependencies(),
nonExistList, nonResolvedList);
+ String missingDeps = coordAction.getMissingDependencies();
+ CoordCommandUtils.getResolvedList(missingDeps, nonExistList,
nonResolvedList);
// For clarity regarding which is the missing dependency in
synchronous order
// instead of printing entire list, some of which, may be available
@@ -136,7 +139,11 @@ public class CoordActionInputCheckXComma
if (nonResolvedList.length() > 0 && status == false) {
nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
}
- coordAction.setMissingDependencies(nonExistList.toString());
+ String nonExistListStr = nonExistList.toString();
+ if (!missingDeps.equals(nonExistListStr)) {
+ isChangeInDependency = true;
+ coordAction.setMissingDependencies(nonExistListStr);
+ }
String pushDeps = coordAction.getPushMissingDependencies();
if (status == true && (pushDeps == null || pushDeps.length() ==
0)) {
coordAction.setStatus(CoordinatorAction.Status.READY);
@@ -164,7 +171,13 @@ public class CoordActionInputCheckXComma
cron.stop();
if(jpaService != null) {
try {
- jpaService.execute(new
org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(coordAction));
+ if (isChangeInDependency) {
+ jpaService.execute(new
org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor(
+ coordAction));
+ }
+ else {
+ jpaService.execute(new
CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
+ }
}
catch(JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1021,
jex.getMessage(), jex);
@@ -530,6 +543,7 @@ public class CoordActionInputCheckXComma
/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#eagerLoadState()
*/
+ // TODO - why loadState() is being called from eagerLoadState();
@Override
protected void eagerLoadState() throws CommandException {
loadState();
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1431711&r1=1431710&r2=1431711&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Thu Jan 10 22:34:52 2013
@@ -40,6 +40,7 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import
org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -80,6 +81,7 @@ public class CoordPushDependencyCheckXCo
@Override
protected Void execute() throws CommandException {
+ boolean isChangeInDependency = false;
String pushDeps = coordAction.getPushMissingDependencies();
if (pushDeps == null || pushDeps.length() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
@@ -101,7 +103,11 @@ public class CoordPushDependencyCheckXCo
if (missingDeps.size() > 0) {
pushDeps = StringUtils.join(missingDeps,
CoordELFunctions.INSTANCE_SEPARATOR);
- coordAction.setPushMissingDependencies(pushDeps);
+ // only set the push missing deps if some of the deps are available
+ if (availableDepList.size() != 0) {
+ isChangeInDependency = true;
+ coordAction.setPushMissingDependencies(pushDeps);
+ }
// Checking for timeout
if (!isTimeout()) {
queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()),
@@ -112,6 +118,7 @@ public class CoordPushDependencyCheckXCo
}
}
else { // All push-based dependencies are available
+ isChangeInDependency = true;
coordAction.setPushMissingDependencies("");
if (coordAction.getMissingDependencies() == null ||
coordAction.getMissingDependencies().length() == 0) {
coordAction.setStatus(CoordinatorAction.Status.READY);
@@ -120,7 +127,7 @@ public class CoordPushDependencyCheckXCo
}
}
- updateCoordAction(coordAction, availableDepList);
+ updateCoordAction(coordAction, availableDepList, isChangeInDependency);
return null;
}
@@ -156,18 +163,25 @@ public class CoordPushDependencyCheckXCo
return missingDeps;
}
- private void updateCoordAction(CoordinatorActionBean coordAction2,
List<String> availPartitionList) throws CommandException {
+ private void updateCoordAction(CoordinatorActionBean coordAction2,
List<String> availPartitionList,
+ boolean isChangeInDependency) throws CommandException {
coordAction.setLastModifiedTime(new Date());
if (jpaService != null) {
try {
- jpaService.execute(new
CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
- PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
- if (pdms.removeAvailablePartitions(
-
PartitionDependencyManagerService.createPartitionWrappers(availPartitionList),
actionId)) {
- LOG.debug("Succesfully removed partitions for actionId:
[{0}] from available Map ", actionId);
+ if (isChangeInDependency) {
+ jpaService.execute(new
CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ PartitionDependencyManagerService pdms = Services.get()
+ .get(PartitionDependencyManagerService.class);
+ if (pdms.removeAvailablePartitions(
+
PartitionDependencyManagerService.createPartitionWrappers(availPartitionList),
actionId)) {
+ LOG.debug("Succesfully removed partitions for
actionId: [{0}] from available Map ", actionId);
+ }
+ else {
+ LOG.warn("Unable to remove partitions for actionId:
[{0}] from available Map ", actionId);
+ }
}
else {
- LOG.warn("Unable to remove partitions for actionId: [{0}]
from available Map ", actionId);
+ jpaService.execute(new
CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
}
}
catch (JPAExecutorException jex) {
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForModifiedTimeJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForModifiedTimeJPAExecutor.java?rev=1431711&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForModifiedTimeJPAExecutor.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdateForModifiedTimeJPAExecutor.java
Thu Jan 10 22:34:52 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Updates the last modified time of CoordinatorAction and persists it.
+ * It executes SQL update query and return type is Void.
+ */
+public class CoordActionUpdateForModifiedTimeJPAExecutor implements
JPAExecutor<Void> {
+
+ private CoordinatorActionBean coordAction = null;
+
+ public CoordActionUpdateForModifiedTimeJPAExecutor(CoordinatorActionBean
coordAction) {
+ ParamChecker.notNull(coordAction, "coordAction");
+ this.coordAction = coordAction;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Void execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q =
em.createNamedQuery("UPDATE_COORD_ACTION_FOR_MODIFIED_DATE");
+ q.setParameter("id", coordAction.getId());
+ q.setParameter("lastModifiedTime", new Date());
+ q.executeUpdate();
+ // Since the return type is Void, we have to return null
+ return null;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "CoordActionUpdateForModifiedTimeJPAExecutor";
+ }
+}
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java?rev=1431711&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
(added)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionUpdateForModifiedTimeJPAExecutor.java
Thu Jan 10 22:34:52 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestCoordActionUpdateForModifiedTimeJPAExecutor extends
XDataTestCase {
+ Services services;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testCoordActionUpdateModifiedTime() throws Exception {
+ int actionNum = 1;
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action =
addRecordToCoordActionTable(job.getId(), actionNum,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ _testCoordActionUpdateModifiedTime(action);
+ }
+
+ private void _testCoordActionUpdateModifiedTime(CoordinatorActionBean
action) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ Date currentDate = new Date();
+ assertTrue(currentDate.getTime() -
action.getLastModifiedTime().getTime() > 0);
+ // Call the JPAUpdate executor to execute the Update command
+ CoordActionUpdateForModifiedTimeJPAExecutor coordUpdCmd = new
CoordActionUpdateForModifiedTimeJPAExecutor(
+ action);
+ jpaService.execute(coordUpdCmd);
+
+ CoordActionGetJPAExecutor coordGetCmd = new
CoordActionGetJPAExecutor(action.getId());
+ CoordinatorActionBean newAction = jpaService.execute(coordGetCmd);
+
+ assertNotNull(newAction);
+ assertTrue(newAction.getLastModifiedTime().getTime() -
currentDate.getTime() > 0);
+ }
+
+}
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1431711&r1=1431710&r2=1431711&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Jan 10 22:34:52 2013
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1161 Remove unnecessary db updates for some of the blobs like
missing_dependencies' of Coordinator Action(virag)
OOZIE-1156 Make all the latest/future instances as pull dependences (virag)
OOZIE-1144 OOZIE-1137 breaks the sharelib (rkanter)
OOZIE-1035 Improve forkjoin validation to allow same errorTo transitions
(rkanter)