jiangxin369 commented on code in PR #250: URL: https://github.com/apache/flink-ml/pull/250#discussion_r1298134912
########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/OperatorScopeManagedMemoryManager.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.datacache.nonkeyed; + +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * A manager for operator-scope managed memory. + * + * <p>Every operator must create one (and only one) instance of this class to manage usages of Review Comment: Instead of allowing users to create `OperatorScopeManagedMemoryManager`, would it be better to provide an `OperatorScopeManagedMemoryManagerBroker` which is `@PublicEvolving` to manage `OperatorScopeManagedMemoryManager` of all operators. It helps ensure that only one instance exists in each operator like below. ``` @PublicEvolving class OperatorScopeManagedMemoryManagerBroker { private Map<OperatorID, OperatorScopeManagedMemoryManager> managers; public static OperatorScopeManagedMemoryManager getOrCreate(OperatorID id) {} } ``` ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/OperatorScopeManagedMemoryManager.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.datacache.nonkeyed; + +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * A manager for operator-scope managed memory. + * + * <p>Every operator must create one (and only one) instance of this class to manage usages of + * operator-scope managed memery. In the operator, for every usage, {@link #register} is called to Review Comment: ```suggestion * operator-scope managed memory. In the operator, for every usage, {@link #register} is called to ``` ########## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java: ########## @@ -239,9 +240,13 @@ public void initializeState(StateInitializationContext context) throws Exception context.getOperatorStateStore() .getListState(new ListStateDescriptor<>("centroids", type)); + OperatorScopeManagedMemoryManager manager = new OperatorScopeManagedMemoryManager(); + manager.register("points-state", 1.); points = new ListStateWithCache<>( new VectorWithNormSerializer(), + manager, + "points-state", Review Comment: It is recommended to declare the reused Strings as a static final constant. ########## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/OperatorScopeManagedMemoryManager.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.datacache.nonkeyed; + +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * A manager for operator-scope managed memory. + * + * <p>Every operator must create one (and only one) instance of this class to manage usages of + * operator-scope managed memery. In the operator, for every usage, {@link #register} is called to + * declare its weight of memory usage with a key. Then, the fraction of memory can be obtained by + * calling {@link #getFraction}. + * + * <p>Note that all calls of {@link #register} must be before those of {@link #getFraction}. + */ +public class OperatorScopeManagedMemoryManager { + + protected Map<String, Double> weights = new HashMap<>(); + protected boolean frozen = false; + protected double sum; + + public void register(String key, double weight) { + Preconditions.checkState(!frozen, "Cannot call register after getFraction is called."); + Preconditions.checkArgument(weight >= 0, "The weight must be non-negative."); + weights.put(key, weight); Review Comment: Would it be better to throw exceptions when the duplicate key is registered, instead of overwrite it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
