We were trying to run a SGD example based on the ml4all example. PlanFunction planFunction = (operand, pb, hyperparams) -> { // Step 0: Cast operand and extract hyperparams List<Double> weights = (List<Double>) operand; String inputFileUrl = (String) hyperparams.get("inputFileUrl"); int datasetSize = (int) hyperparams.get("datasetSize");
// Step 1: Define ML operators Sample sampleOp = new SGDSample(); Transform transformOp = new LibSVMTransform(29); Compute computeOp = new ComputeLogisticGradient(); // Step 2: Create weight DataQuanta var weightsBuilder = pb .loadCollection(weights) .withName("weights"); // Step 3: Load dataset and apply transform DataQuantaBuilder transformBuilder = (DataQuantaBuilder) pb .readTextFile(inputFileUrl) .withName("source") .mapPartitions(new TransformPerPartitionWrapper(transformOp)) .withName("transform"); Collection<?> parsedData = transformBuilder.collect(); for (Object row : parsedData) { System.out.println(row); } // Step 4: Sample, compute gradient, and broadcast weights DataQuantaBuilder result = (DataQuantaBuilder) transformBuilder .sample(sampleOp.sampleSize()) .withSampleMethod(sampleOp.sampleMethod()) .withDatasetSize(datasetSize) .map(new ComputeWrapper<>(computeOp)) .withBroadcast(weightsBuilder, "weights"); // Step 5: Return final operator return result.dataQuanta().operator(); }; this is the plan that we've written but for some reason it's not working, i.e when we execute it we get the following error "executing T[JavaTextFileSource[source]] failed." Do you know why this may be so? Thanks a lot.