Fixing checkstyle errors
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4918e3ad Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4918e3ad Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4918e3ad Branch: refs/heads/samza-standalone Commit: 4918e3ad7c622a36f6c8d8dc8eb6c7778fdaac05 Parents: 0a2b63a Author: navina <[email protected]> Authored: Fri Dec 23 17:03:25 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 17:03:25 2016 -0800 ---------------------------------------------------------------------- .../org/apache/samza/config/JavaJobConfig.java | 21 ++++++++- .../samza/config/JobCoordinatorConfig.java | 23 ++++++++-- .../java/org/apache/samza/config/ZkConfig.java | 21 ++++++++- .../task/SimpleGroupByContainerCount.java | 30 ++++++++++--- .../SimpleGroupByContainerCountFactory.java | 1 - .../leaderelection/LeaderElector.java | 19 ++++++++ .../processor/SamzaContainerController.java | 41 ++++++++++++----- .../samza/zk/BarrierForVersionUpgrade.java | 19 ++++++++ .../samza/zk/ScheduleAfterDebounceTime.java | 34 ++++++++++++--- .../samza/zk/ZkBarrierForVersionUpgrade.java | 39 +++++++++++++---- .../java/org/apache/samza/zk/ZkController.java | 27 +++++++++++- .../org/apache/samza/zk/ZkControllerImpl.java | 36 ++++++++++++--- .../org/apache/samza/zk/ZkJobCoordinator.java | 46 +++++++++++++------- .../samza/zk/ZkJobCoordinatorFactory.java | 19 ++++++++ .../java/org/apache/samza/zk/ZkKeyBuilder.java | 28 ++++++++++-- .../org/apache/samza/zk/ZkLeaderElector.java | 23 +++++++++- .../java/org/apache/samza/zk/ZkListener.java | 19 ++++++++ .../main/java/org/apache/samza/zk/ZkUtils.java | 46 +++++++++++++++----- 18 files changed, 413 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java index c0747f0..24b9427 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.config; public class JavaJobConfig extends MapConfig { @@ -5,7 +24,7 @@ public class JavaJobConfig extends MapConfig { private static final String JOB_ID = "job.id"; // streaming.job_id private static final String DEFAULT_JOB_ID = "1"; - public JavaJobConfig (Config config) { + public JavaJobConfig(Config config) { super(config); } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java index c8e496e..4bb66de 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -1,14 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.config; import com.google.common.base.Strings; -import org.apache.samza.coordinator.JobCoordinatorFactory; -import org.apache.samza.util.Util; public class JobCoordinatorConfig extends MapConfig { // TODO: Change this to job-coordinator.factory private static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory"; - public JobCoordinatorConfig (Config config) { + public JobCoordinatorConfig(Config config) { super(config); } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java index 973db42..31b1eda 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.config; public class ZkConfig extends MapConfig { @@ -9,7 +28,7 @@ public class ZkConfig extends MapConfig { public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; - public ZkConfig (Config config) { + public ZkConfig(Config config) { super(config); } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java index 359c4ed..cf225c3 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -1,5 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.container.grouper.task; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -7,10 +30,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.samza.container.LocalityManager; -import org.apache.samza.container.TaskName; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.TaskModel; public class SimpleGroupByContainerCount implements TaskNameGrouper { @@ -18,6 +37,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper { public SimpleGroupByContainerCount() { this.startContainerCount = 1; } + public SimpleGroupByContainerCount(int containerCount) { if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); this.startContainerCount = containerCount; @@ -33,7 +53,7 @@ public class SimpleGroupByContainerCount implements TaskNameGrouper { } public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) { - if(containersIds == null) + if (containersIds == null) return this.group(tasks); int containerCount = containersIds.size(); http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java index 02918f6..b1c6b92 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCountFactory.java @@ -20,7 +20,6 @@ package org.apache.samza.container.grouper.task; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; /** http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java index fc3cac9..2565ee2 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.coordinator.leaderelection; public interface LeaderElector { http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java index fb227d0..d448d30 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.processor; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -38,12 +57,12 @@ public class SamzaContainerController { * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer} * Requests to execute a container are submitted to the {@link ExecutorService} * - * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or - * {@link org.apache.samza.task.AsyncStreamTask} + * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or + * {@link org.apache.samza.task.AsyncStreamTask} * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances - * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance + * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance */ - public SamzaContainerController ( + public SamzaContainerController( Object taskFactory, long containerShutdownMs, String processorId, @@ -62,14 +81,14 @@ public class SamzaContainerController { /** * Instantiates a container and submits to the executor. This method does not actually wait for the container to * fully start-up. For such a behavior, see {@link #awaitStart(long)} - * + * <p> * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to * ensure that the container has been stopped with stopContainer before invoking this method.</i> * - * @param containerModel {@link ContainerModel} instance to use for the current run of the Container - * @param config Complete configuration map used by the Samza job + * @param containerModel {@link ContainerModel} instance to use for the current run of the Container + * @param config Complete configuration map used by the Samza job * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams - * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments + * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments */ public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) { LocalityManager localityManager = null; @@ -96,7 +115,7 @@ public class SamzaContainerController { * * @param timeoutMs Maximum time to wait, in milliseconds * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting - * time elapsed + * time elapsed * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up */ public boolean awaitStart(long timeoutMs) throws InterruptedException { @@ -107,14 +126,14 @@ public class SamzaContainerController { * Stops a running container, if any. Invoking this method multiple times does not have any side-effects. */ public void stopContainer() { - if(container == null) { + if (container == null) { log.warn("Shutdown before a container was created."); return; } container.shutdown(); try { - if(containerFuture != null) + if (containerFuture != null) containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException e) { log.error("Ran into problems while trying to stop the container in the processor!", e); http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java index 691aced..c2c4c23 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import java.util.List; http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index e217dab..d174938 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -1,18 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ScheduleAfterDebounceTime { public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); - public static final long timeoutMs = 1000*10; + public static final long TIMEOUT_MS = 1000 * 10; public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; @@ -24,19 +45,18 @@ public class ScheduleAfterDebounceTime { new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build()); private final Map<String, ScheduledFuture> futureHandles = new HashMap<>(); - public ScheduleAfterDebounceTime () { - + public ScheduleAfterDebounceTime() { } - synchronized public void scheduleAfterDebounceTime (String actionName, long debounceTimeMs, Runnable runnable) {//, final ReadyToCreateJobModelListener listener) { + synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) { //, final ReadyToCreateJobModelListener listener) { // check if this action has been scheduled already ScheduledFuture sf = futureHandles.get(actionName); - if(sf != null && !sf.isDone()) { + if (sf != null && !sf.isDone()) { LOG.info(">>>>>>>>>>>DEBOUNCE: cancel future for " + actionName); // attempt to cancel - if(! sf.cancel(false) ) { + if (!sf.cancel(false)) { try { - sf.get(timeoutMs, TimeUnit.MILLISECONDS); + sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { // we ignore the exception LOG.warn("cancel for action " + actionName + " failed with ", e); http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 60a06da..524afed 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -1,6 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import java.util.List; + import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.slf4j.Logger; @@ -17,7 +37,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { final private String barrierPrefix; - public ZkBarrierForVersionUpgrade( ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { + public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { this.zkUtils = zkUtils; keyBuilder = zkUtils.getKeyBuilder(); @@ -83,7 +103,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { // Find out the event & Log boolean allIn = true; - if(currentChildren == null) { + if (currentChildren == null) { LOG.info("Got handleChildChange with null currentChildren"); return; } @@ -101,33 +121,34 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { // check if all the names are in - for(String n : names) { - if(!currentChildren.contains(n)) { + for (String n : names) { + if (!currentChildren.contains(n)) { LOG.info("node " + n + " is still not in the list "); allIn = false; break; } } - if(allIn) { + if (allIn) { LOG.info("ALl nodes reached the barrier"); callback.run(); // all the names have registered } } } - class ZkBarrierReachedHandler implements IZkDataListener { + class ZkBarrierReachedHandler implements IZkDataListener { private final ScheduleAfterDebounceTime debounceTimer; private final String barrierPathDone; private final Runnable callback; + public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) { this.barrierPathDone = barrierPathDone; - this.callback = callback; + this.callback = callback; this.debounceTimer = debounceTimer; } @Override public void handleDataChange(String dataPath, Object data) - throws Exception { + throws Exception { String done = (String) data; LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done); if (done.equals(BARRIER_DONE)) { @@ -140,7 +161,7 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { @Override public void handleDataDeleted(String dataPath) - throws Exception { + throws Exception { LOG.warn("barrier done got deleted at " + dataPath); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java index 20e55ab..76ff8d2 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java @@ -1,11 +1,34 @@ -package org.apache.samza.zk; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; public interface ZkController { - void register (); + void register(); + boolean isLeader(); + void notifyJobModelChange(String version); + void stop(); + void listenToProcessorLiveness(); + String currentJobModelVersion(); } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java index fdd1f02..b00a3ee 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import org.I0Itec.zkclient.IZkChildListener; @@ -18,7 +37,7 @@ public class ZkControllerImpl implements ZkController { private final ZkLeaderElector leaderElector; private final ScheduleAfterDebounceTime debounceTimer; - public ZkControllerImpl (String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) { + public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) { this.processorIdStr = processorIdStr; this.zkUtils = zkUtils; this.zkListener = zkListener; @@ -30,11 +49,10 @@ public class ZkControllerImpl implements ZkController { @Override public void register() { - // TODO - make a loop here with some number of attempts. // possibly split into two method - becomeLeader() and becomeParticipant() boolean isLeader = leaderElector.tryBecomeLeader(); - if(isLeader) { + if (isLeader) { listenToProcessorLiveness(); // zkUtils.subscribeToProcessorChange(zkProcessorChangeListener); @@ -49,7 +67,7 @@ public class ZkControllerImpl implements ZkController { private void init() { ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); - zkUtils.makeSurePersistentPathsExists(new String[] { + zkUtils.makeSurePersistentPathsExists(new String[]{ keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix()}); } @@ -82,11 +100,13 @@ public class ZkControllerImpl implements ZkController { } // Only by Leader - class ZkProcessorChangeHandler implements IZkChildListener { + class ZkProcessorChangeHandler implements IZkChildListener { private final ScheduleAfterDebounceTime debounceTimer; + public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) { this.debounceTimer = debounceTimer; } + /** * Called when the children of the given path changed. * @@ -106,11 +126,14 @@ public class ZkControllerImpl implements ZkController { class ZkJobModelVersionChangeHandler implements IZkDataListener { private final ScheduleAfterDebounceTime debounceTimer; + public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) { this.debounceTimer = debounceTimer; } + /** * called when job model version gets updated + * * @param dataPath * @param data * @throws Exception @@ -123,6 +146,7 @@ public class ZkControllerImpl implements ZkController { debounceTimer .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data)); } + @Override public void handleDataDeleted(String dataPath) throws Exception { throw new SamzaException("version update path has been deleted!."); @@ -130,7 +154,7 @@ public class ZkControllerImpl implements ZkController { } public void shutdown() { - if(debounceTimer != null) + if (debounceTimer != null) debounceTimer.stopScheduler(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index f661547..e83b16b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -1,17 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; -import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.JobModelManager$; -import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.processor.SamzaContainerController; import org.apache.samza.system.StreamMetadataCache; @@ -22,7 +35,11 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * JobCoordinator for stand alone processor managed via Zookeeper. @@ -60,12 +77,11 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); //should not have any state in it - // TEMP for model generation //////////////////////////////// NEEDS TO BE REPLACED ////////////////////////////////////// JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); Map<String, SystemAdmin> systemAdmins = new HashMap<>(); - for (String systemName: systemConfig.getSystemNames()) { + for (String systemName : systemConfig.getSystemNames()) { String systemFactoryClassName = systemConfig.getSystemFactory(systemName); if (systemFactoryClassName == null) { log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); @@ -75,11 +91,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); } - streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock + streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock .instance()); - //////////////////////////////////////////////////////////////////////////////////////////// } @@ -99,8 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { } @Override - public int getProcessorId() - { + public int getProcessorId() { return processorId; } @@ -125,9 +139,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { List<String> currentProcessors = zkUtils.getActiveProcessors(); // get the current version - String currentJMVersion = zkUtils.getJobModelVersion(); + String currentJMVersion = zkUtils.getJobModelVersion(); String nextJMVersion; - if(currentJMVersion == null) + if (currentJMVersion == null) nextJMVersion = "1"; else nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1); @@ -139,7 +153,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { //JobModel jobModel = new JobModel(config, containers); StringBuilder sb = new StringBuilder(); List<Integer> containerIds = new ArrayList<>(); - for(String processor: currentProcessors){ + for (String processor : currentProcessors) { String zkProcessorId = keyBuilder.parseContainerIdFromProcessorId(processor); sb.append(zkProcessorId).append(","); containerIds.add(Integer.valueOf(zkProcessorId)); @@ -163,7 +177,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion); } - ////////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////////////////////////////////////////////////////////////// @Override public void onProcessorChange(List<String> processorIds) { // Reset debounce Timer http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 90b0097..02db340 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import org.apache.samza.config.Config; http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java index 42d0c86..efe3349 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import org.apache.samza.SamzaException; @@ -10,10 +29,11 @@ public class ZkKeyBuilder { public static final String JOBMODEL_VERSION_PATH = "jobModelVersion"; - public ZkKeyBuilder () { + public ZkKeyBuilder() { this(""); } - public ZkKeyBuilder (String pathPrefix) { + + public ZkKeyBuilder(String pathPrefix) { this.pathPrefix = pathPrefix; } @@ -23,12 +43,12 @@ public class ZkKeyBuilder { public static String parseIdFromPath(String path) { if (path != null) - return path.substring(path.indexOf(PROCESSOR_ID_PREFIX)); + return path.substring(path.indexOf(PROCESSOR_ID_PREFIX)); return null; } public static String parseContainerIdFromProcessorId(String prId) { - if(prId == null) + if (prId == null) throw new SamzaException("processor id is null"); return prId.substring(prId.indexOf(PROCESSOR_ID_PREFIX) + PROCESSOR_ID_PREFIX.length()); http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 8bffeb6..ecc0f0b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -1,12 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; -import java.util.Arrays; import org.I0Itec.zkclient.IZkDataListener; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.leaderelection.LeaderElector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.Random; @@ -23,7 +42,7 @@ public class ZkLeaderElector implements LeaderElector { private String currentSubscription = null; private final Random random = new Random(); - public ZkLeaderElector (String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) { + public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkListener zkListener) { this.processorIdStr = processorIdStr; this.zkUtils = zkUtils; this.keyBuilder = this.zkUtils.getKeyBuilder(); http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java index 4a1c491..a0c69c6 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; import java.util.List; http://git-wip-us.apache.org/repos/asf/samza/blob/4918e3ad/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index de8c213..05100a5 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -1,6 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.zk; -import java.io.IOException; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; @@ -17,6 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; @@ -75,8 +94,9 @@ public class ZkUtils { public ZkKeyBuilder getKeyBuilder() { return keyBuilder; } + public void makeSurePersistentPathsExists(String[] paths) { - for(String path: paths) { + for (String path : paths) { if (!zkClient.exists(path)) { zkClient.createPersistent(path, true); } @@ -119,13 +139,14 @@ public class ZkUtils { throw new SamzaException(e); } } + public JobModel getJobModel(String jobModelVersion) { LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion)); Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion)); ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); JobModel jm; try { - jm = mmapper.readValue((String)data, JobModel.class); + jm = mmapper.readValue((String) data, JobModel.class); } catch (IOException e) { throw new SamzaException("failed to read JobModel from ZK", e); } @@ -140,23 +161,24 @@ public class ZkUtils { public void publishNewJobModelVersion(String oldVersion, String newVersion) { Stat stat = new Stat(); - String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat); + String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat); LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat.getVersion() + ")"); - if(currentVersion != null && !currentVersion.equals(oldVersion)) { - throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion); + if (currentVersion != null && !currentVersion.equals(oldVersion)) { + throw new SamzaException("Someone change JMVersion while Leader was generating: expected" + oldVersion + ", got " + currentVersion); } int dataVersion = stat.getVersion(); stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion); - if(stat.getVersion() != dataVersion + 1) + if (stat.getVersion() != dataVersion + 1) throw new SamzaException("Someone changed data version of the JMVersion while Leader was generating a new one. current= " + dataVersion + ", old version = " + stat.getVersion()); LOG.info("pid=" + processorId + " published new version: " + newVersion + "; expected dataVersion = " + dataVersion + "(" + stat.getVersion() - + ")"); + + ")"); } /** * subscribe for changes of JobModel version + * * @param dataListener describe this */ public void subscribeToJobModelVersionChange(IZkDataListener dataListener) { @@ -192,13 +214,13 @@ public class ZkUtils { } zkClient.close(); - if(debounceTimer != null) + if (debounceTimer != null) debounceTimer.stopScheduler(); } public void deleteRoot() { String rootPath = keyBuilder.getRootPath(); - if(rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) { + if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) { LOG.info("pid=" + processorId + " Deleteing root: " + rootPath); zkClient.deleteRecursive(rootPath); } @@ -206,6 +228,7 @@ public class ZkUtils { class ZkStateChangeHandler implements IZkStateListener { private final ScheduleAfterDebounceTime debounceTimer; + public ZkStateChangeHandler(ScheduleAfterDebounceTime debounceTimer) { this.debounceTimer = debounceTimer; } @@ -217,7 +240,8 @@ public class ZkUtils { * @throws Exception On any error. */ @Override - public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { } + public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { + } /** * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
