phet commented on a change in pull request #3427: URL: https://github.com/apache/gobblin/pull/3427#discussion_r754553904
########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinZkHelixManager.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.helix.InstanceType; +import org.apache.helix.manager.zk.ZKHelixManager; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link ZKHelixManager} which keeps a reference count of users. + * Every user should call connect and disconnect to increase and decrease the count. + * Calls to connect and disconnect to the underlying ZKHelixManager are made only for the first and last usage respectively. + */ +@Slf4j +public class GobblinZkHelixManager extends ZKHelixManager { Review comment: I'd encourage a more specific name, tracking to how it works, such as `GobblinReferenceCountingZkHelixManager` ########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java ########## @@ -160,30 +159,28 @@ protected void addLeadershipChangeAwareComponent (LeadershipChangeAwareComponent /** * Build the {@link HelixManager} for the Application Master. */ - protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) { + protected static HelixManager buildHelixManager(Config config, String clusterName, InstanceType type) { + Preconditions.checkArgument(config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY)); + String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + log.info("Using ZooKeeper connection string: " + zkConnectionString); + String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, GobblinClusterManager.class.getSimpleName()); - return HelixManagerFactory.getZKHelixManager( + return GobblinHelixManagerFactory.getZKHelixManager( Review comment: ok... I see how this has been used, so apparently not written to a common interface... in that case, probably not worth big changes now. ########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinZkHelixManager.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.helix.InstanceType; +import org.apache.helix.manager.zk.ZKHelixManager; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link ZKHelixManager} which keeps a reference count of users. + * Every user should call connect and disconnect to increase and decrease the count. + * Calls to connect and disconnect to the underlying ZKHelixManager are made only for the first and last usage respectively. + */ +@Slf4j +public class GobblinZkHelixManager extends ZKHelixManager { + final AtomicInteger usageCount = new AtomicInteger(0); Review comment: `private`? ########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; + + +public class GobblinHelixManagerFactory { + + public static HelixManager getZKHelixManager(String clusterName, String instanceName, + InstanceType type, String zkAddr) { + return new GobblinZkHelixManager(clusterName, instanceName, type, zkAddr); Review comment: two things: first, surprised there's no interface declaration, as is typical so different implementions of the factory could be used interchangeably. second, not a big deal, but the method name and return type names usually match, whereas the specific instance type (created) might match the class name (granted, more considerations when a common interface is being implemented). ########## File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ########## @@ -203,6 +209,7 @@ public TaskResult run() { return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils .getFullStackTrace(e)); } finally { + this.jobHelixManager.disconnect(); Review comment: still wondering on this... doesn't the `.connect()` belong prior to the start of the `try` block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
