lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890784808
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java:
##########
@@ -254,58 +272,150 @@ public Tuple3<Integer, DenseVector, Long>
map(Tuple2<Integer, DenseVector> value
DenseVector, DenseVector[], Tuple2<Integer,
DenseVector>>,
IterationListener<Tuple2<Integer, DenseVector>> {
private final DistanceMeasure distanceMeasure;
- private ListState<DenseVector> points;
- private ListState<DenseVector[]> centroids;
+ private ListState<DenseVector[]> centroidsState;
+ private DenseVector[] centroids;
+
+ private Path basePath;
+ private OperatorID operatorID;
+ private MemorySegmentPool segmentPool;
+ private DataCacheWriter<DenseVector> dataCacheWriter;
public SelectNearestCentroidOperator(DistanceMeasure distanceMeasure) {
+ super();
this.distanceMeasure = distanceMeasure;
}
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<Tuple2<Integer, DenseVector>>> output) {
+ super.setup(containingTask, config, output);
+
+ operatorID = config.getOperatorID();
+
+ MemoryManager memoryManager =
getContainingTask().getEnvironment().getMemoryManager();
Review Comment:
Could you refactor the code so that algorithm developers won't need to
handle so many details?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]