This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 310d504b5314a44c4b290dfefc84451e5714e2b5 Author: 刈刀 <[email protected]> AuthorDate: Mon Aug 2 15:54:53 2021 +0800 change the variant name --- .../rocketmq/streams/lease/LeaseComponent.java | 38 +++++++++++----------- .../lease/service/impl/BasedLesaseImpl.java | 2 +- .../lease/service/impl/LeaseServiceImpl.java | 21 ++++++------ .../lease/service/storages/DBLeaseStorage.java | 6 ++-- .../rocketmq/streams/lease/LeaseComponentTest.java | 3 +- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/LeaseComponent.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/LeaseComponent.java index 3a527a3..27817f6 100644 --- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/LeaseComponent.java +++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/LeaseComponent.java @@ -13,10 +13,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */package org.apache.rocketmq.streams.lease; - -import java.util.Properties; + */ +package org.apache.rocketmq.streams.lease; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.component.AbstractComponent; import org.apache.rocketmq.streams.common.component.ComponentCreator; import org.apache.rocketmq.streams.common.component.ConfigureDescriptor; @@ -27,11 +28,11 @@ import org.apache.rocketmq.streams.lease.service.ILeaseService; import org.apache.rocketmq.streams.lease.service.ILeaseStorage; import org.apache.rocketmq.streams.lease.service.impl.LeaseServiceImpl; import org.apache.rocketmq.streams.lease.service.impl.MockLeaseImpl; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.lease.service.storages.DBLeaseStorage; import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent; +import java.util.Properties; + /** * 通过db实现租约和锁,可以更轻量级,减少其他中间件的依赖 使用主备场景,只有一个实例运行,当当前实例挂掉,在一定时间内,会被其他实例接手 也可以用于全局锁 * @@ -50,10 +51,10 @@ public class LeaseComponent extends AbstractComponent<ILeaseService> { } public static LeaseComponent getInstance() { - if(leaseComponent==null){ - synchronized (LeaseComponent.class){ - if(leaseComponent==null){ - leaseComponent =ComponentCreator.getComponent(null,LeaseComponent.class); + if (leaseComponent == null) { + synchronized (LeaseComponent.class) { + if (leaseComponent == null) { + leaseComponent = ComponentCreator.getComponent(null, LeaseComponent.class); } } } @@ -79,25 +80,24 @@ public class LeaseComponent extends AbstractComponent<ILeaseService> { protected boolean initProperties(Properties properties) { String connectType = properties.getProperty(JDBC_URL); if (StringUtil.isEmpty(connectType)) { - MockLeaseImpl mockLease = new MockLeaseImpl(); - this.leaseService=mockLease; + this.leaseService = new MockLeaseImpl(); return true; } - LeaseServiceImpl leaseService= new LeaseServiceImpl(); - String storageName=ComponentCreator.getProperties().getProperty(ConfigureFileKey.LEASE_STORAGE_NAME); - ILeaseStorage storasge=null; - if(StringUtil.isEmpty(storageName)){ + LeaseServiceImpl leaseService = new LeaseServiceImpl(); + String storageName = ComponentCreator.getProperties().getProperty(ConfigureFileKey.LEASE_STORAGE_NAME); + ILeaseStorage storasge = null; + if (StringUtil.isEmpty(storageName)) { String jdbc = properties.getProperty(AbstractComponent.JDBC_DRIVER); String url = properties.getProperty(AbstractComponent.JDBC_URL); String userName = properties.getProperty(AbstractComponent.JDBC_USERNAME); String password = properties.getProperty(AbstractComponent.JDBC_PASSWORD); - storasge=new DBLeaseStorage(jdbc,url,userName,password); - }else { - storasge= (ILeaseStorage)ServiceLoaderComponent.getInstance(ILeaseStorage.class).loadService(storageName); + storasge = new DBLeaseStorage(jdbc, url, userName, password); + } else { + storasge = (ILeaseStorage)ServiceLoaderComponent.getInstance(ILeaseStorage.class).loadService(storageName); } leaseService.setLeaseStorage(storasge); - this.leaseService=leaseService; + this.leaseService = leaseService; return true; } } diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java index 21db98d..e400b8d 100644 --- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java +++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/BasedLesaseImpl.java @@ -42,7 +42,7 @@ public abstract class BasedLesaseImpl implements ILeaseService { private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class); private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_"; - private static AtomicBoolean syncStart = new AtomicBoolean(false); + private static final AtomicBoolean syncStart = new AtomicBoolean(false); private static final int synTime = 120; // 5分钟的一致性hash同步时间太久了,改为2分钟 protected ScheduledExecutorService taskExecutor = null; protected int leaseTerm = 300 * 2; // 租约时间 diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/LeaseServiceImpl.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/LeaseServiceImpl.java index 860710a..23e38f5 100644 --- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/LeaseServiceImpl.java +++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/impl/LeaseServiceImpl.java @@ -91,8 +91,7 @@ public class LeaseServiceImpl extends BasedLesaseImpl { } Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseSecond);// 默认锁定5分钟,用完需要立刻释放.如果时间不同步,可能导致锁失败 - boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的时间是leaseTerm - return success; + return tryGetLease(lockerName, nextLeaseDate); } @Override @@ -136,7 +135,7 @@ public class LeaseServiceImpl extends BasedLesaseImpl { Date nextLeaseDate = DateUtil.addSecond(new Date(), lockTimeSecond); boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的时间是leaseTerm - if (success == false) { + if (!success) { return false; } leaseName2Date.put(lockerName, nextLeaseDate); @@ -175,7 +174,7 @@ public class LeaseServiceImpl extends BasedLesaseImpl { } private class HoldLockTask extends ApplyTask { - protected volatile boolean iscontinue = true; + protected volatile boolean isContinue = true; protected LeaseServiceImpl leaseService; protected ScheduledExecutorService scheduledExecutor; @@ -191,20 +190,20 @@ public class LeaseServiceImpl extends BasedLesaseImpl { } public void close() { - iscontinue = false; + isContinue = false; if (scheduledExecutor != null) { scheduledExecutor.shutdown(); } } - public boolean isIscontinue() { - return iscontinue; + public boolean isContinue() { + return isContinue; } @Override public void run() { try { - if (!iscontinue) { + if (!isContinue) { return; } Date leaseDate = applyLeaseTask(leaseTerm, name, new AtomicBoolean(false)); @@ -213,14 +212,14 @@ public class LeaseServiceImpl extends BasedLesaseImpl { LOG.debug("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 续约锁成功, 租约到期时间为 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate)); } else { - iscontinue = false; + isContinue = false; synchronized (leaseService) { holdLockTasks.remove(name); } LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 续约锁失败,续锁程序会停止"); } } catch (Exception e) { - iscontinue = false; + isContinue = false; LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 续约锁出现异常,续锁程序会停止", e); } @@ -256,7 +255,7 @@ public class LeaseServiceImpl extends BasedLesaseImpl { @Override public Boolean get() throws InterruptedException, ExecutionException { - while (isDone() == false) { + while (!isDone()) { Thread.sleep(1000); } return true; diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java index b99132d..d1ba078 100644 --- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java +++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java @@ -176,9 +176,9 @@ public class DBLeaseStorage implements ILeaseStorage { leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map)); leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class)); leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class)); - Integer stauts = getMapValue("status", map, Integer.class); - if (stauts != null) { - leaseInfo.setStatus(stauts); + Integer status = getMapValue("status", map, Integer.class); + if (status != null) { + leaseInfo.setStatus(status); } leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map)); Long version = getMapLongValue("version", map); diff --git a/rocketmq-streams-lease/src/test/java/org/apache/rocketmq/streams/lease/LeaseComponentTest.java b/rocketmq-streams-lease/src/test/java/org/apache/rocketmq/streams/lease/LeaseComponentTest.java index 6404e9f..075a9b4 100644 --- a/rocketmq-streams-lease/src/test/java/org/apache/rocketmq/streams/lease/LeaseComponentTest.java +++ b/rocketmq-streams-lease/src/test/java/org/apache/rocketmq/streams/lease/LeaseComponentTest.java @@ -112,8 +112,7 @@ public class LeaseComponentTest { return true; } - boolean success = leaseService.holdLock(name, lockName, leaseTime); - return success; + return leaseService.holdLock(name, lockName, leaseTime); } }
