This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
4250-add-csv-dataset-import-with-preview-for-data-sets
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 30660e94dc1e177694975b4c0c9bfa7d2af6277d
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Mar 13 16:15:16 2026 +0100

    feat(#4250): Improve upload dialog
---
 .../datalake/importer/CsvImportPreviewRequest.java |   9 +
 .../datalake/importer/CsvImportPreviewResult.java  |   9 +
 .../model/datalake/importer/CsvImportRequest.java  |   9 +
 .../impl/datalake/CsvDataLakeImportService.java    | 578 ++++++++++++++++++---
 .../rest/impl/datalake/CsvImportUploadStorage.java | 130 +++++
 .../rest/impl/datalake/DataLakeDataWriter.java     |  47 +-
 .../rest/impl/datalake/DataLakeImportResource.java |  44 +-
 .../datalake/CsvDataLakeImportServiceTest.java     | 109 +++-
 .../rest/impl/datalake/DataLakeDataWriterTest.java |  56 ++
 ...achine-data-simulator-import-missing-values.csv |   8 +
 10 files changed, 914 insertions(+), 85 deletions(-)

diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java
index 84e52dbeb1..7af3a0f44b 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewRequest.java
@@ -22,12 +22,21 @@ import java.util.List;
 
 public class CsvImportPreviewRequest {
 
+  private String uploadId;
   private String fileName;
   private CsvImportConfiguration csvConfig;
   private List<String> headers;
   private List<List<String>> rows;
   private CsvImportTarget target;
 
+  public String getUploadId() {
+    return uploadId;
+  }
+
+  public void setUploadId(String uploadId) {
+    this.uploadId = uploadId;
+  }
+
   public String getFileName() {
     return fileName;
   }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java
index 6f6ebcf227..4bb9b84cf8 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportPreviewResult.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 public class CsvImportPreviewResult {
 
+  private String uploadId;
   private List<String> headers = new ArrayList<>();
   private List<List<String>> previewRows = new ArrayList<>();
   private List<CsvImportColumn> columns = new ArrayList<>();
@@ -33,6 +34,14 @@ public class CsvImportPreviewResult {
   private boolean valid;
   private List<CsvImportValidationMessage> validationMessages = new 
ArrayList<>();
 
+  public String getUploadId() {
+    return uploadId;
+  }
+
+  public void setUploadId(String uploadId) {
+    this.uploadId = uploadId;
+  }
+
   public List<String> getHeaders() {
     return headers;
   }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java
index e571bae89e..c882562ae7 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/importer/CsvImportRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 public class CsvImportRequest {
 
+  private String uploadId;
   private CsvImportConfiguration csvConfig;
   private List<String> headers = new ArrayList<>();
   private List<List<String>> rows = new ArrayList<>();
@@ -30,6 +31,14 @@ public class CsvImportRequest {
   private String timestampColumn;
   private List<CsvImportColumn> columns = new ArrayList<>();
 
+  public String getUploadId() {
+    return uploadId;
+  }
+
+  public void setUploadId(String uploadId) {
+    this.uploadId = uploadId;
+  }
+
   public CsvImportConfiguration getCsvConfig() {
     return csvConfig;
   }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java
index a36980446e..8848170b08 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportService.java
@@ -22,7 +22,6 @@ import 
org.apache.streampipes.connect.management.util.EventSchemaUtils;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataSeriesBuilder;
-import org.apache.streampipes.model.datalake.SpQueryResult;
 import org.apache.streampipes.model.datalake.SpQueryResultBuilder;
 import org.apache.streampipes.model.datalake.importer.CsvImportColumn;
 import org.apache.streampipes.model.datalake.importer.CsvImportConfiguration;
@@ -44,6 +43,14 @@ import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.IOException;
+import java.io.PushbackReader;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
@@ -61,47 +68,85 @@ import java.util.stream.Collectors;
 public class CsvDataLakeImportService {
 
   private static final int MAX_PREVIEW_ROWS = 50;
+  private static final int MAX_ANALYSIS_ROWS = 200;
+  private static final int IMPORT_BATCH_SIZE = 5000;
   private static final String STREAM_PREFIX = "s0::";
 
   private final IDataExplorerSchemaManagement schemaManagement;
   private final DataLakeDataWriter dataWriter;
+  private final CsvImportUploadStorage uploadStorage;
 
   public CsvDataLakeImportService(IDataExplorerSchemaManagement 
schemaManagement) {
     this(
         schemaManagement,
-        new DataLakeDataWriter(false)
+        new DataLakeDataWriter(false, true),
+        new CsvImportUploadStorage()
     );
   }
 
   CsvDataLakeImportService(
       IDataExplorerSchemaManagement schemaManagement,
       DataLakeDataWriter dataWriter
+  ) {
+    this(schemaManagement, dataWriter, new CsvImportUploadStorage());
+  }
+
+  CsvDataLakeImportService(
+      IDataExplorerSchemaManagement schemaManagement,
+      DataLakeDataWriter dataWriter,
+      CsvImportUploadStorage uploadStorage
   ) {
     this.schemaManagement = schemaManagement;
     this.dataWriter = dataWriter;
+    this.uploadStorage = uploadStorage;
   }
 
   public CsvImportPreviewResult preview(CsvImportPreviewRequest request) {
     var validationMessages = validatePreviewRequest(request);
     var headers = sanitizeHeaders(request.getHeaders());
     var rows = 
Optional.ofNullable(request.getRows()).orElseGet(Collections::emptyList);
+    return buildPreviewResult(request, headers, rows, validationMessages, 
null);
+  }
 
-    var columns = inferColumns(headers, rows, request.getCsvConfig());
-    var eventSchema = buildEventSchema(columns, rows, request.getCsvConfig(), 
null);
-    validationMessages.addAll(validatePreviewTarget(request.getTarget()));
+  public CsvImportPreviewResult preview(CsvImportPreviewRequest request, 
String principalSid) {
+    if (!hasUploadId(request)) {
+      return preview(request);
+    }
 
-    var result = new CsvImportPreviewResult();
-    result.setHeaders(headers);
-    
result.setPreviewRows(rows.stream().limit(MAX_PREVIEW_ROWS).collect(Collectors.toList()));
-    result.setColumns(columns);
-    result.setGuessedEventSchema(eventSchema);
-    result.setTimestampCandidates(columns.stream()
-        .filter(CsvImportColumn::isTimestampCandidate)
-        .map(CsvImportColumn::getRuntimeName)
-        .collect(Collectors.toList()));
-    result.setValidationMessages(validationMessages);
-    result.setValid(validationMessages.isEmpty());
-    return result;
+    var validationMessages = validatePreviewConfiguration(request);
+    if (!validationMessages.isEmpty()) {
+      return buildInvalidPreviewResult(validationMessages, 
request.getUploadId());
+    }
+
+    try {
+      var upload = resolveUpload(request.getUploadId(), principalSid);
+      var csvSample = readCsvSample(upload.path(), request.getCsvConfig(), 
MAX_ANALYSIS_ROWS);
+      return buildPreviewResult(request, csvSample.headers(), 
csvSample.rows(), validationMessages, upload.uploadId());
+    } catch (CsvImportValidationException e) {
+      return buildInvalidPreviewResult(e.getValidationMessages(), 
request.getUploadId());
+    } catch (IOException | UncheckedIOException e) {
+      return buildInvalidPreviewResult(
+          List.of(message("file", "The CSV file could not be parsed with the 
current settings.")),
+          request.getUploadId()
+      );
+    }
+  }
+
+  public CsvImportPreviewResult preview(MultipartFile file, 
CsvImportPreviewRequest request, String principalSid)
+      throws IOException {
+    var validationMessages = validatePreviewConfiguration(request);
+    if (!validationMessages.isEmpty()) {
+      return buildInvalidPreviewResult(validationMessages, null);
+    }
+
+    var upload = uploadStorage.store(file, principalSid);
+    try {
+      var csvSample = readCsvSample(upload.path(), request.getCsvConfig(), 
MAX_ANALYSIS_ROWS);
+      return buildPreviewResult(request, csvSample.headers(), 
csvSample.rows(), validationMessages, upload.uploadId());
+    } catch (IOException | UncheckedIOException e) {
+      uploadStorage.remove(upload.uploadId());
+      throw e;
+    }
   }
 
   public CsvImportSchemaValidationResult 
validateSchema(CsvImportSchemaValidationRequest request) {
@@ -127,7 +172,11 @@ public class CsvDataLakeImportService {
   }
 
   public CsvImportResult importData(CsvImportRequest request, String 
principalSid) {
-    var validationMessages = validateImportRequest(request);
+    if (hasUploadId(request)) {
+      return importUploadedData(request, principalSid);
+    }
+
+    var validationMessages = validateInlineImportRequest(request);
     if (!validationMessages.isEmpty()) {
       throw new CsvImportValidationException(validationMessages);
     }
@@ -158,8 +207,14 @@ public class CsvDataLakeImportService {
       measure = 
requireExistingMeasurement(request.getTarget().getMeasurementName());
     }
 
-    var queryResult = toQueryResult(request);
-    dataWriter.writeData(measure, queryResult);
+    var queryResult = DataSeriesBuilder.create()
+        
.withHeaders(request.getColumns().stream().map(CsvImportColumn::getRuntimeName).collect(Collectors.toList()))
+        .withRows(toImportRows(request))
+        .build();
+    dataWriter.writeData(
+        measure,
+        
SpQueryResultBuilder.create(queryResult.getHeaders()).withDataSeries(queryResult).build()
+    );
 
     var result = new CsvImportResult();
     result.setMeasurementId(measure.getElementId());
@@ -170,12 +225,211 @@ public class CsvDataLakeImportService {
     return result;
   }
 
-  private List<CsvImportValidationMessage> 
validatePreviewRequest(CsvImportPreviewRequest request) {
+  private CsvImportResult importUploadedData(CsvImportRequest request, String 
principalSid) {
+    var validationMessages = validateStoredImportRequest(request);
+    if (!validationMessages.isEmpty()) {
+      throw new CsvImportValidationException(validationMessages);
+    }
+
+    var upload = resolveUpload(request.getUploadId(), principalSid);
+    var sanitizedColumns = sanitizeImportColumns(request.getColumns());
+    var eventSchema = buildConfiguredEventSchema(sanitizedColumns, 
request.getTimestampColumn());
+
+    validationMessages.addAll(validateImportTarget(request.getTarget(), 
eventSchema, request.getTimestampColumn()));
+    if (!validationMessages.isEmpty()) {
+      throw new CsvImportValidationException(validationMessages);
+    }
+
+    var createdNewMeasurement = false;
+    DataLakeMeasure measure;
+
+    if (request.getTarget().getMode() == CsvImportTargetMode.NEW) {
+      measure = new DataLakeMeasure();
+      measure.setMeasureName(request.getTarget().getMeasurementName().trim());
+      measure.setTimestampField(STREAM_PREFIX + request.getTimestampColumn());
+      measure.setEventSchema(eventSchema);
+      measure = schemaManagement.createOrUpdateMeasurement(measure, 
principalSid);
+      createdNewMeasurement = true;
+    } else {
+      measure = 
requireExistingMeasurement(request.getTarget().getMeasurementName());
+    }
+
+    try {
+      var importedRowCount = importCsvFile(upload.path(), request, measure);
+      uploadStorage.remove(upload.uploadId());
+
+      var result = new CsvImportResult();
+      result.setMeasurementId(measure.getElementId());
+      result.setMeasurementName(measure.getMeasureName());
+      result.setCreatedNewMeasurement(createdNewMeasurement);
+      result.setImportedRowCount(importedRowCount);
+      result.setValidationMessages(List.of());
+      return result;
+    } catch (IOException | UncheckedIOException e) {
+      throw new CsvImportValidationException(List.of(
+          message("file", "The CSV file could not be parsed with the current 
settings.")
+      ));
+    }
+  }
+
+  private CsvImportPreviewResult buildPreviewResult(
+      CsvImportPreviewRequest request,
+      List<String> headers,
+      List<List<String>> rows,
+      List<CsvImportValidationMessage> validationMessages,
+      String uploadId
+  ) {
+    var messages = new ArrayList<>(validationMessages);
+    var columns = inferColumns(headers, rows, request.getCsvConfig());
+    var eventSchema = buildEventSchema(columns, rows, request.getCsvConfig(), 
null);
+    messages.addAll(validatePreviewTarget(request.getTarget()));
+
+    var result = new CsvImportPreviewResult();
+    result.setUploadId(uploadId);
+    result.setHeaders(headers);
+    
result.setPreviewRows(rows.stream().limit(MAX_PREVIEW_ROWS).collect(Collectors.toList()));
+    result.setColumns(columns);
+    result.setGuessedEventSchema(eventSchema);
+    result.setTimestampCandidates(columns.stream()
+        .filter(CsvImportColumn::isTimestampCandidate)
+        .map(CsvImportColumn::getRuntimeName)
+        .collect(Collectors.toList()));
+    result.setValidationMessages(messages);
+    result.setValid(messages.isEmpty());
+    return result;
+  }
+
+  private CsvImportPreviewResult buildInvalidPreviewResult(
+      List<CsvImportValidationMessage> validationMessages,
+      String uploadId
+  ) {
+    var result = new CsvImportPreviewResult();
+    result.setUploadId(uploadId);
+    result.setValidationMessages(validationMessages);
+    result.setValid(false);
+    return result;
+  }
+
+  private List<List<Object>> toImportRows(CsvImportRequest request) {
+    var rows = new ArrayList<List<Object>>();
+    for (int rowIndex = 0; rowIndex < request.getRows().size(); rowIndex++) {
+      rows.add(convertRow(request.getRows().get(rowIndex), request, rowIndex + 
1));
+    }
+    return rows;
+  }
+
+  private int importCsvFile(Path path, CsvImportRequest request, 
DataLakeMeasure measure) throws IOException {
+    var runtimeHeaders = request.getColumns().stream()
+        .map(CsvImportColumn::getRuntimeName)
+        .collect(Collectors.toList());
+    var batch = new ArrayList<List<Object>>();
+    var importedRows = new int[]{0};
+
+    parseCsvFile(path, request.getCsvConfig(), new CsvRowConsumer() {
+      private List<String> parsedHeaders;
+
+      @Override
+      public void onHeaders(List<String> headers) {
+        parsedHeaders = headers;
+        validateUploadedHeaders(headers, request.getColumns());
+      }
+
+      @Override
+      public void onRow(int rowNumber, List<String> row) {
+        if (row.size() != parsedHeaders.size()) {
+          throw new CsvImportValidationException(List.of(
+              message("rows", "Row " + rowNumber + " does not match the header 
size.")
+          ));
+        }
+        batch.add(convertRow(row, request, rowNumber));
+        if (batch.size() >= IMPORT_BATCH_SIZE) {
+          flushImportBatch(measure, runtimeHeaders, batch);
+          importedRows[0] += IMPORT_BATCH_SIZE;
+          batch.clear();
+        }
+      }
+    });
+
+    if (!batch.isEmpty()) {
+      var batchSize = batch.size();
+      flushImportBatch(measure, runtimeHeaders, batch);
+      importedRows[0] += batchSize;
+    }
+
+    return importedRows[0];
+  }
+
+  private void flushImportBatch(DataLakeMeasure measure, List<String> 
runtimeHeaders, List<List<Object>> batch) {
+    dataWriter.writeData(measure, runtimeHeaders, new ArrayList<>(batch));
+  }
+
+  private List<Object> convertRow(List<String> row, CsvImportRequest request, 
int rowNumber) {
+    var converted = new ArrayList<Object>();
+    for (int i = 0; i < row.size(); i++) {
+      converted.add(convertValue(
+          row.get(i),
+          request.getColumns().get(i),
+          request.getCsvConfig(),
+          request.getTimestampColumn(),
+          rowNumber
+      ));
+    }
+    return converted;
+  }
+
+  private void validateUploadedHeaders(List<String> headers, 
List<CsvImportColumn> columns) {
+    if (headers.size() != columns.size()) {
+      throw new CsvImportValidationException(List.of(
+          message("headers", "The uploaded CSV file no longer matches the 
previewed column count.")
+      ));
+    }
+
+    for (int i = 0; i < headers.size(); i++) {
+      if (!Objects.equals(headers.get(i), columns.get(i).getCsvColumn())) {
+        throw new CsvImportValidationException(List.of(
+            message("headers", "The uploaded CSV file no longer matches the 
previewed headers.")
+        ));
+      }
+    }
+  }
+
+  private CsvImportUploadStorage.StoredUpload resolveUpload(String uploadId, 
String principalSid) {
+    var upload = uploadStorage.get(uploadId).orElseThrow(() -> new 
CsvImportValidationException(List.of(
+        message("uploadId", "The uploaded CSV file was not found. Please 
upload the file again.")
+    )));
+
+    if (!Objects.equals(upload.ownerSid(), principalSid)) {
+      throw new CsvImportValidationException(List.of(
+          message("uploadId", "The uploaded CSV file is no longer available 
for this user.")
+      ));
+    }
+
+    return upload;
+  }
+
+  private boolean hasUploadId(CsvImportPreviewRequest request) {
+    return request != null && request.getUploadId() != null && 
!request.getUploadId().isBlank();
+  }
+
+  private boolean hasUploadId(CsvImportRequest request) {
+    return request != null && request.getUploadId() != null && 
!request.getUploadId().isBlank();
+  }
+
+  private List<CsvImportValidationMessage> 
validatePreviewConfiguration(CsvImportPreviewRequest request) {
     var messages = new ArrayList<CsvImportValidationMessage>();
     if (request == null) {
       messages.add(message("request", "Import request must be provided."));
       return messages;
     }
+    validateCsvConfig(request.getCsvConfig(), messages);
+    return messages;
+  }
+
+  private List<CsvImportValidationMessage> 
validatePreviewRequest(CsvImportPreviewRequest request) {
+    var messages = validatePreviewConfiguration(request);
+    if (request == null) {
+      return messages;
+    }
     if (request.getHeaders() == null || request.getHeaders().isEmpty()) {
       messages.add(message("headers", "At least one header must be 
provided."));
     }
@@ -183,14 +437,12 @@ public class CsvDataLakeImportService {
       messages.add(message("rows", "At least one row must be provided."));
     }
     validateRowsMatchHeaders(request.getHeaders(), request.getRows(), 
messages);
-    validateCsvConfig(request.getCsvConfig(), messages);
     return messages;
   }
 
-  private List<CsvImportValidationMessage> 
validateImportRequest(CsvImportRequest request) {
-    var messages = new ArrayList<CsvImportValidationMessage>();
+  private List<CsvImportValidationMessage> 
validateInlineImportRequest(CsvImportRequest request) {
+    var messages = validateStoredImportRequest(request);
     if (request == null) {
-      messages.add(message("request", "Import request must be provided."));
       return messages;
     }
     if (request.getHeaders() == null || request.getHeaders().isEmpty()) {
@@ -200,6 +452,15 @@ public class CsvDataLakeImportService {
       messages.add(message("rows", "At least one row must be provided."));
     }
     validateRowsMatchHeaders(request.getHeaders(), request.getRows(), 
messages);
+    return messages;
+  }
+
+  private List<CsvImportValidationMessage> 
validateStoredImportRequest(CsvImportRequest request) {
+    var messages = new ArrayList<CsvImportValidationMessage>();
+    if (request == null) {
+      messages.add(message("request", "Import request must be provided."));
+      return messages;
+    }
     validateCsvConfig(request.getCsvConfig(), messages);
 
     if (request.getTarget() == null || request.getTarget().getMode() == null) {
@@ -211,6 +472,11 @@ public class CsvDataLakeImportService {
     if (request.getColumns() == null || request.getColumns().isEmpty()) {
       messages.add(message("columns", "Column configuration must be 
provided."));
     }
+    if (!hasUploadId(request)
+        && (request.getRows() == null || request.getRows().isEmpty())
+        && (request.getHeaders() == null || request.getHeaders().isEmpty())) {
+      messages.add(message("uploadId", "Either an uploadId or inline CSV rows 
must be provided."));
+    }
     return messages;
   }
 
@@ -397,8 +663,7 @@ public class CsvDataLakeImportService {
         continue;
       }
 
-      if (!Objects.equals(getRuntimeType(entry.getValue()), 
getRuntimeType(imported))
-      ) {
+      if (!Objects.equals(getRuntimeType(entry.getValue()), 
getRuntimeType(imported))) {
         issues.add(issue(
             CsvImportSchemaIssueType.COLUMN_TYPE_MISMATCH,
             entry.getKey(),
@@ -427,28 +692,6 @@ public class CsvDataLakeImportService {
         )));
   }
 
-  private SpQueryResult toQueryResult(CsvImportRequest request) {
-    var headers = request.getColumns().stream()
-        .map(CsvImportColumn::getRuntimeName)
-        .collect(Collectors.toList());
-    var rows = new ArrayList<List<Object>>();
-    for (var row : request.getRows()) {
-      var converted = new ArrayList<Object>();
-      for (int i = 0; i < row.size(); i++) {
-        converted.add(convertValue(row.get(i), request.getColumns().get(i), 
request.getCsvConfig(), request.getTimestampColumn()));
-      }
-      rows.add(converted);
-    }
-
-    var dataSeries = DataSeriesBuilder.create()
-        .withHeaders(headers)
-        .withRows(rows)
-        .build();
-    return SpQueryResultBuilder.create(headers)
-        .withDataSeries(dataSeries)
-        .build();
-  }
-
   private EventSchema buildEventSchema(
       List<CsvImportColumn> columns,
       List<List<String>> rows,
@@ -555,7 +798,6 @@ public class CsvDataLakeImportService {
     var allBoolean = true;
     var allLong = true;
     var allNumber = true;
-    var timestampCandidate = false;
 
     for (var row : rows) {
       var value = row.get(columnIndex);
@@ -573,7 +815,7 @@ public class CsvDataLakeImportService {
       }
     }
 
-    timestampCandidate = isTimestampCandidate(rows, columnIndex, config);
+    var timestampCandidate = isTimestampCandidate(rows, columnIndex, config);
 
     if (timestampCandidate || allLong) {
       return "LONG";
@@ -621,21 +863,53 @@ public class CsvDataLakeImportService {
       CsvImportColumn column,
       CsvImportConfiguration config,
       String timestampColumn
+  ) {
+    return convertValue(rawValue, column, config, timestampColumn, null);
+  }
+
+  private Object convertValue(
+      String rawValue,
+      CsvImportColumn column,
+      CsvImportConfiguration config,
+      String timestampColumn,
+      Integer rowNumber
   ) {
     if (rawValue == null || rawValue.isBlank()) {
+      if (rowNumber != null && Objects.equals(column.getRuntimeName(), 
timestampColumn)) {
+        throw new CsvImportValidationException(List.of(
+            message(
+                "rows",
+                "Row " + rowNumber + " is missing a value for timestamp column 
\"" + column.getCsvColumn() + "\"."
+            )
+        ));
+      }
       return null;
     }
+
     var trimmed = rawValue.trim();
-    if (Objects.equals(column.getRuntimeName(), timestampColumn)) {
-      return parseTimestamp(trimmed, config);
-    }
+    try {
+      if (Objects.equals(column.getRuntimeName(), timestampColumn)) {
+        return parseTimestamp(trimmed, config);
+      }
 
-    return switch (finalRuntimeType(column, timestampColumn)) {
-      case "BOOLEAN" -> 
Boolean.parseBoolean(trimmed.toLowerCase(Locale.ENGLISH));
-      case "LONG" -> Long.parseLong(normalizeNumber(trimmed, config));
-      case "FLOAT" -> Double.parseDouble(normalizeNumber(trimmed, config));
-      default -> trimmed;
-    };
+      return switch (finalRuntimeType(column, timestampColumn)) {
+        case "BOOLEAN" -> 
Boolean.parseBoolean(trimmed.toLowerCase(Locale.ENGLISH));
+        case "LONG" -> Long.parseLong(normalizeNumber(trimmed, config));
+        case "FLOAT" -> Double.parseDouble(normalizeNumber(trimmed, config));
+        default -> trimmed;
+      };
+    } catch (RuntimeException e) {
+      if (rowNumber == null) {
+        throw e;
+      }
+
+      throw new CsvImportValidationException(List.of(
+          message(
+              "rows",
+              "Row " + rowNumber + " contains an invalid value for column \"" 
+ column.getCsvColumn() + "\"."
+          )
+      ));
+    }
   }
 
   private long parseTimestamp(String value, CsvImportConfiguration config) {
@@ -736,14 +1010,190 @@ public class CsvDataLakeImportService {
     return new CsvImportSchemaIssue(type, columnName, expected, actual);
   }
 
-  private String normalize(String value) {
-    return value == null ? null : value.trim().toLowerCase(Locale.ENGLISH);
-  }
-
   private String getRuntimeType(EventProperty property) {
     if (property instanceof EventPropertyPrimitive primitive) {
       return primitive.getRuntimeType();
     }
     return null;
   }
+
+  private CsvFileSample readCsvSample(Path path, CsvImportConfiguration 
config, int maxRows) throws IOException {
+    var headers = new ArrayList<String>();
+    var rows = new ArrayList<List<String>>();
+
+    parseCsvFile(path, config, new CsvRowConsumer() {
+      @Override
+      public void onHeaders(List<String> parsedHeaders) {
+        headers.addAll(parsedHeaders);
+      }
+
+      @Override
+      public void onRow(int rowNumber, List<String> row) {
+        if (row.size() != headers.size()) {
+          throw new CsvImportValidationException(List.of(
+              message("rows", "Row " + rowNumber + " does not match the header 
size.")
+          ));
+        }
+        if (rows.size() < maxRows) {
+          rows.add(row);
+        }
+      }
+    });
+
+    if (headers.isEmpty()) {
+      throw new CsvImportValidationException(List.of(message("headers", "At 
least one header must be provided.")));
+    }
+    if (rows.isEmpty()) {
+      throw new CsvImportValidationException(List.of(message("rows", "At least 
one row must be provided.")));
+    }
+
+    return new CsvFileSample(headers, rows);
+  }
+
+  private void parseCsvFile(Path path, CsvImportConfiguration config, 
CsvRowConsumer consumer) throws IOException {
+    try (var reader = new PushbackReader(Files.newBufferedReader(path, 
StandardCharsets.UTF_8), 1)) {
+      var delimiter = normalizeDelimiter(config == null ? null : 
config.getDelimiter());
+      var hasHeader = config == null || config.isHasHeader();
+      List<String> headers = null;
+      int rowNumber = 0;
+      List<String> row;
+
+      while ((row = readNextRow(reader, delimiter)) != null) {
+        if (isBlankRow(row)) {
+          continue;
+        }
+
+        if (headers == null) {
+          if (hasHeader) {
+            headers = normalizeHeaders(row);
+            consumer.onHeaders(headers);
+          } else {
+            headers = generateHeaders(row.size());
+            consumer.onHeaders(headers);
+            rowNumber += 1;
+            consumer.onRow(rowNumber, row);
+          }
+        } else {
+          rowNumber += 1;
+          consumer.onRow(rowNumber, row);
+        }
+      }
+    }
+  }
+
+  private List<String> readNextRow(PushbackReader reader, char delimiter) 
throws IOException {
+    var currentRow = new ArrayList<String>();
+    var currentValue = new StringBuilder();
+    var inQuotes = false;
+    var readAny = false;
+
+    while (true) {
+      var nextInt = reader.read();
+      if (nextInt == -1) {
+        if (!readAny && currentValue.length() == 0 && currentRow.isEmpty()) {
+          return null;
+        }
+        currentRow.add(currentValue.toString());
+        return currentRow;
+      }
+
+      readAny = true;
+      var currentChar = (char) nextInt;
+      if (currentChar == '"') {
+        if (inQuotes) {
+          var escapedCandidate = reader.read();
+          if (escapedCandidate == '"') {
+            currentValue.append('"');
+          } else {
+            inQuotes = false;
+            if (escapedCandidate != -1) {
+              reader.unread(escapedCandidate);
+            }
+          }
+        } else {
+          inQuotes = true;
+        }
+      } else if (!inQuotes && currentChar == delimiter) {
+        currentRow.add(currentValue.toString());
+        currentValue = new StringBuilder();
+      } else if (!inQuotes && (currentChar == '\n' || currentChar == '\r')) {
+        if (currentChar == '\r') {
+          var maybeLineFeed = reader.read();
+          if (maybeLineFeed != '\n' && maybeLineFeed != -1) {
+            reader.unread(maybeLineFeed);
+          }
+        }
+        currentRow.add(currentValue.toString());
+        return currentRow;
+      } else {
+        currentValue.append(currentChar);
+      }
+    }
+  }
+
+  private char normalizeDelimiter(String delimiter) {
+    if (delimiter == null || delimiter.isEmpty()) {
+      return ',';
+    }
+    if ("\\t".equals(delimiter)) {
+      return '\t';
+    }
+    return delimiter.charAt(0);
+  }
+
+  private List<String> normalizeHeaders(List<String> headers) {
+    var normalized = new ArrayList<String>();
+    for (int i = 0; i < headers.size(); i++) {
+      var value = headers.get(i);
+      if (i == 0) {
+        value = stripBom(value);
+      }
+      var trimmed = value == null ? "" : value.trim();
+      normalized.add(trimmed.isEmpty() ? "column_" + (i + 1) : trimmed);
+    }
+    return normalized;
+  }
+
+  private String stripBom(String value) {
+    return value == null ? null : value.replace("\uFEFF", "");
+  }
+
+  private List<String> generateHeaders(int size) {
+    var headers = new ArrayList<String>();
+    for (int i = 0; i < size; i++) {
+      headers.add("column_" + (i + 1));
+    }
+    return headers;
+  }
+
+  private boolean isBlankRow(List<String> row) {
+    return row.stream().allMatch(cell -> cell == null || 
cell.trim().isEmpty());
+  }
+
+  @FunctionalInterface
+  private interface CsvRowConsumer {
+    default void onHeaders(List<String> headers) {
+    }
+
+    void onRow(int rowNumber, List<String> row);
+  }
+
+  private static final class CsvFileSample {
+
+    private final List<String> headers;
+    private final List<List<String>> rows;
+
+    private CsvFileSample(List<String> headers, List<List<String>> rows) {
+      this.headers = headers;
+      this.rows = rows;
+    }
+
+    List<String> headers() {
+      return headers;
+    }
+
+    List<List<String>> rows() {
+      return rows;
+    }
+  }
 }
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java
new file mode 100644
index 0000000000..0651034f72
--- /dev/null
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/CsvImportUploadStorage.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+class CsvImportUploadStorage {
+
+  private static final Duration DEFAULT_TTL = Duration.ofHours(12);
+
+  private final ConcurrentMap<String, StoredUpload> uploads;
+  private final Duration ttl;
+
+  CsvImportUploadStorage() {
+    this(DEFAULT_TTL);
+  }
+
+  CsvImportUploadStorage(Duration ttl) {
+    this.uploads = new ConcurrentHashMap<>();
+    this.ttl = ttl;
+  }
+
+  StoredUpload store(MultipartFile file, String ownerSid) throws IOException {
+    cleanupExpired();
+
+    var uploadId = UUID.randomUUID().toString();
+    var tempFile = Files.createTempFile("streampipes-csv-import-", ".csv");
+    try (var inputStream = file.getInputStream()) {
+      Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
+    }
+
+    var upload = new StoredUpload(
+        uploadId,
+        tempFile,
+        ownerSid,
+        Instant.now()
+    );
+    uploads.put(uploadId, upload);
+    return upload;
+  }
+
+  Optional<StoredUpload> get(String uploadId) {
+    cleanupExpired();
+    return Optional.ofNullable(uploads.get(uploadId));
+  }
+
+  void remove(String uploadId) {
+    var removed = uploads.remove(uploadId);
+    if (removed != null) {
+      deleteQuietly(removed.path());
+    }
+  }
+
+  private void cleanupExpired() {
+    var expiresBefore = Instant.now().minus(ttl);
+    uploads.entrySet().removeIf(entry -> {
+      var expired = entry.getValue().createdAt().isBefore(expiresBefore);
+      if (expired) {
+        deleteQuietly(entry.getValue().path());
+      }
+      return expired;
+    });
+  }
+
+  private void deleteQuietly(Path path) {
+    try {
+      Files.deleteIfExists(path);
+    } catch (IOException ignored) {
+      // Best-effort cleanup of temporary uploads.
+    }
+  }
+
+  static final class StoredUpload {
+
+    private final String uploadId;
+    private final Path path;
+    private final String ownerSid;
+    private final Instant createdAt;
+
+    StoredUpload(String uploadId, Path path, String ownerSid, Instant 
createdAt) {
+      this.uploadId = uploadId;
+      this.path = path;
+      this.ownerSid = ownerSid;
+      this.createdAt = createdAt;
+    }
+
+    String uploadId() {
+      return uploadId;
+    }
+
+    Path path() {
+      return path;
+    }
+
+    String ownerSid() {
+      return ownerSid;
+    }
+
+    Instant createdAt() {
+      return createdAt;
+    }
+  }
+}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
index a722b28484..c841e88a34 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriter.java
@@ -31,17 +31,24 @@ import 
org.apache.streampipes.storage.management.StorageDispatcher;
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 public class DataLakeDataWriter {
 
   private final boolean ignoreSchemaMismatch;
+  private final boolean allowMissingFields;
 
   public DataLakeDataWriter(boolean ignoreSchemaMismatch) {
+    this(ignoreSchemaMismatch, false);
+  }
+
+  public DataLakeDataWriter(boolean ignoreSchemaMismatch, boolean 
allowMissingFields) {
     this.ignoreSchemaMismatch = ignoreSchemaMismatch;
+    this.allowMissingFields = allowMissingFields;
   }
 
   public void writeData(String measureName, SpQueryResult queryResult) {
@@ -57,6 +64,14 @@ public class DataLakeDataWriter {
     getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
   }
 
+  public void writeData(DataLakeMeasure measure, List<String> headers, 
List<List<Object>> rows) {
+    var dataSeries = new DataSeries();
+    dataSeries.setHeaders(headers);
+    dataSeries.setRows(rows);
+    dataSeries.setTotal(rows.size());
+    getTimeSeriesStoreAndPersistQueryResult(dataSeries, measure);
+  }
+
   private void getTimeSeriesStoreAndPersistQueryResult(DataSeries dataSeries,
                                                        DataLakeMeasure 
measure){
     var timeSeriesStore = getTimeSeriesStore(measure);
@@ -99,13 +114,24 @@ public class DataLakeDataWriter {
           .collect(Collectors.toSet());
       var runtimeNameSet = new HashSet<>(runtimeNames);
 
-      if (!runtimeNameSet.equals(strippedEventKeys)){
+      if (!matchesRuntimeNames(runtimeNameSet, strippedEventKeys, 
allowMissingFields)) {
         throw new SpRuntimeException("The fields of the event do not match. 
Use \"ignoreSchemaMismatch\" to "
             + "ignore this error. Fields of the event: " + strippedEventKeys);
       }
     }
   }
 
+  static boolean matchesRuntimeNames(
+      Set<String> expectedRuntimeNames,
+      Set<String> actualRuntimeNames,
+      boolean allowMissingFields
+  ) {
+    if (allowMissingFields) {
+      return expectedRuntimeNames.containsAll(actualRuntimeNames);
+    }
+    return expectedRuntimeNames.equals(actualRuntimeNames);
+  }
+
   private List<String> getRuntimeNames(DataLakeMeasure measure) {
     var runtimeNames = new ArrayList<String>();
     runtimeNames.add(measure.getTimestampFieldName());
@@ -124,10 +150,18 @@ public class DataLakeDataWriter {
   }
 
   private Event rowToEvent(List<Object> row, List<String> headers){
-    Map<String, Object> eventMap = IntStream.range(0, headers.size())
-        .boxed()
-        .collect(Collectors.toMap(headers::get, row::get));
-    return EventFactory.fromMap(eventMap);
+    return EventFactory.fromMap(toEventMap(row, headers));
+  }
+
+  static Map<String, Object> toEventMap(List<Object> row, List<String> 
headers) {
+    var eventMap = new LinkedHashMap<String, Object>();
+    for (int i = 0; i < headers.size(); i++) {
+      var value = row.get(i);
+      if (value != null) {
+        eventMap.put(headers.get(i), value);
+      }
+    }
+    return eventMap;
   }
 
   private void renameTimestampField(Event event, String timestampField){
@@ -137,4 +171,3 @@ public class DataLakeDataWriter {
   }
 
 }
-
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java
index a5cebe0d44..d680e08fc3 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeImportResource.java
@@ -33,7 +33,11 @@ import 
org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestPart;
 import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.multipart.MultipartFile;
+
+import java.io.IOException;
 
 @RestController
 @RequestMapping("/api/v4/datalake/import")
@@ -53,13 +57,26 @@ public class DataLakeImportResource extends 
AbstractDataLakeResource {
   )
   @PreAuthorize("this.hasWriteAuthority()")
   public ResponseEntity<CsvImportPreviewResult> preview(@RequestBody 
CsvImportPreviewRequest request) {
-    if (request.getTarget() != null
-        && request.getTarget().getMode() == CsvImportTargetMode.EXISTING
-        && 
this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent()
-        && 
!this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) 
{
+    if (!hasWritePermission(request.getTarget())) {
+      return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
+    }
+    return ok(importService.preview(request, getAuthenticatedUserSid()));
+  }
+
+  @PostMapping(
+      path = "/preview",
+      consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
+      produces = MediaType.APPLICATION_JSON_VALUE
+  )
+  @PreAuthorize("this.hasWriteAuthority()")
+  public ResponseEntity<CsvImportPreviewResult> preview(
+      @RequestPart("file") MultipartFile file,
+      @RequestPart("request") CsvImportPreviewRequest request
+  ) throws IOException {
+    if (!hasWritePermission(request.getTarget())) {
       return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
     }
-    return ok(importService.preview(request));
+    return ok(importService.preview(file, request, getAuthenticatedUserSid()));
   }
 
   @PostMapping(
@@ -71,10 +88,7 @@ public class DataLakeImportResource extends 
AbstractDataLakeResource {
   public ResponseEntity<CsvImportSchemaValidationResult> validateSchema(
       @RequestBody CsvImportSchemaValidationRequest request
   ) {
-    if (request.getTarget() != null
-        && request.getTarget().getMode() == CsvImportTargetMode.EXISTING
-        && 
this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent()
-        && 
!this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) 
{
+    if (!hasWritePermission(request.getTarget())) {
       return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
     }
     return ok(importService.validateSchema(request));
@@ -86,10 +100,7 @@ public class DataLakeImportResource extends 
AbstractDataLakeResource {
   )
   @PreAuthorize("this.hasWriteAuthority()")
   public ResponseEntity<CsvImportResult> importData(@RequestBody 
CsvImportRequest request) {
-    if (request.getTarget() != null
-        && request.getTarget().getMode() == CsvImportTargetMode.EXISTING
-        && 
this.dataLakeMeasureManagement.getExistingMeasureByName(request.getTarget().getMeasurementName()).isPresent()
-        && 
!this.checkPermissionByName(request.getTarget().getMeasurementName(), "WRITE")) 
{
+    if (!hasWritePermission(request.getTarget())) {
       return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
     }
 
@@ -101,4 +112,11 @@ public class DataLakeImportResource extends 
AbstractDataLakeResource {
       return ResponseEntity.badRequest().body(result);
     }
   }
+
+  private boolean 
hasWritePermission(org.apache.streampipes.model.datalake.importer.CsvImportTarget
 target) {
+    return target == null
+        || target.getMode() != CsvImportTargetMode.EXISTING
+        || 
this.dataLakeMeasureManagement.getExistingMeasureByName(target.getMeasurementName()).isEmpty()
+        || this.checkPermissionByName(target.getMeasurementName(), "WRITE");
+  }
 }
diff --git 
a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java
 
b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java
index 9224094a7f..c8a9a3283a 100644
--- 
a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java
+++ 
b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/CsvDataLakeImportServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.rest.impl.datalake;
 
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
 import org.apache.streampipes.model.datalake.importer.CsvImportColumn;
 import org.apache.streampipes.model.datalake.importer.CsvImportConfiguration;
 import org.apache.streampipes.model.datalake.importer.CsvImportPreviewRequest;
@@ -34,7 +35,9 @@ import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.vocabulary.XSD;
 
 import org.junit.jupiter.api.Test;
+import org.springframework.web.multipart.MultipartFile;
 
+import java.io.ByteArrayInputStream;
 import java.util.List;
 import java.util.Optional;
 
@@ -43,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -118,7 +122,102 @@ class CsvDataLakeImportServiceTest {
     assertTrue(result.isCreatedNewMeasurement());
     assertEquals(2, result.getImportedRowCount());
     assertEquals("new-measure", result.getMeasurementName());
-    verify(dataWriter).writeData(any(DataLakeMeasure.class), any());
+    verify(dataWriter).writeData(any(DataLakeMeasure.class), 
any(SpQueryResult.class));
+  }
+
+  @Test
+  void shouldPreviewOnceAndImportFromStoredUpload() throws Exception {
+    var schemaManagement = mock(IDataExplorerSchemaManagement.class);
+    var dataWriter = mock(DataLakeDataWriter.class);
+    var uploadStorage = new CsvImportUploadStorage();
+    var service = new CsvDataLakeImportService(schemaManagement, dataWriter, 
uploadStorage);
+
+    when(schemaManagement.getExistingMeasureByName("uploaded-measure"))
+        .thenReturn(Optional.empty());
+    
when(schemaManagement.createOrUpdateMeasurement(any(DataLakeMeasure.class), 
any()))
+        .thenAnswer(invocation -> {
+          var measure = invocation.getArgument(0, DataLakeMeasure.class);
+          measure.setElementId("measure-id");
+          return measure;
+        });
+
+    var previewRequest = new CsvImportPreviewRequest();
+    previewRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter());
+    previewRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, 
"uploaded-measure"));
+
+    var multipartFile = mock(MultipartFile.class);
+    when(multipartFile.getOriginalFilename()).thenReturn("upload.csv");
+    when(multipartFile.getInputStream()).thenReturn(new ByteArrayInputStream(
+        
"timestamp,temperature\n1710000000000,21.3\n1710000060000,22.1\n".getBytes()
+    ));
+
+    var previewResult = service.preview(
+        multipartFile,
+        previewRequest,
+        "sid"
+    );
+
+    assertTrue(previewResult.isValid());
+    assertEquals(2, previewResult.getPreviewRows().size());
+    assertTrue(previewResult.getUploadId() != null && 
!previewResult.getUploadId().isBlank());
+
+    var importRequest = new CsvImportRequest();
+    importRequest.setUploadId(previewResult.getUploadId());
+    importRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter());
+    importRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, 
"uploaded-measure"));
+    importRequest.setTimestampColumn("timestamp");
+    importRequest.setColumns(previewResult.getColumns());
+
+    var importResult = service.importData(importRequest, "sid");
+
+    assertTrue(importResult.isCreatedNewMeasurement());
+    assertEquals(2, importResult.getImportedRowCount());
+    verify(dataWriter).writeData(any(DataLakeMeasure.class), anyList(), 
anyList());
+  }
+
+  @Test
+  void shouldRejectMissingTimestampValuesInUploadedCsv() throws Exception {
+    var schemaManagement = mock(IDataExplorerSchemaManagement.class);
+    var dataWriter = mock(DataLakeDataWriter.class);
+    var uploadStorage = new CsvImportUploadStorage();
+    var service = new CsvDataLakeImportService(schemaManagement, dataWriter, 
uploadStorage);
+
+    when(schemaManagement.getExistingMeasureByName("uploaded-measure"))
+        .thenReturn(Optional.empty());
+    
when(schemaManagement.createOrUpdateMeasurement(any(DataLakeMeasure.class), 
any()))
+        .thenAnswer(invocation -> {
+          var measure = invocation.getArgument(0, DataLakeMeasure.class);
+          measure.setElementId("measure-id");
+          return measure;
+        });
+
+    var previewRequest = new CsvImportPreviewRequest();
+    previewRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter());
+    previewRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, 
"uploaded-measure"));
+
+    var multipartFile = mock(MultipartFile.class);
+    when(multipartFile.getOriginalFilename()).thenReturn("upload.csv");
+    when(multipartFile.getInputStream()).thenReturn(new ByteArrayInputStream(
+        "timestamp,temperature\n1710000000000,21.3\n,22.1\n".getBytes()
+    ));
+
+    var previewResult = service.preview(multipartFile, previewRequest, "sid");
+
+    var importRequest = new CsvImportRequest();
+    importRequest.setUploadId(previewResult.getUploadId());
+    importRequest.setCsvConfig(makeCsvConfigWithCommaDelimiter());
+    importRequest.setTarget(makeTarget(CsvImportTargetMode.NEW, 
"uploaded-measure"));
+    importRequest.setTimestampColumn("timestamp");
+    importRequest.setColumns(previewResult.getColumns());
+
+    var exception = assertThrows(
+        CsvImportValidationException.class,
+        () -> service.importData(importRequest, "sid")
+    );
+
+    assertTrue(exception.getValidationMessages()
+        .stream()
+        .anyMatch(message -> message.getMessage().contains("missing a value 
for timestamp column")));
   }
 
   @Test
@@ -202,6 +301,14 @@ class CsvDataLakeImportServiceTest {
     return config;
   }
 
+  private CsvImportConfiguration makeCsvConfigWithCommaDelimiter() {
+    var config = new CsvImportConfiguration();
+    config.setDelimiter(",");
+    config.setDecimalSeparator(".");
+    config.setHasHeader(true);
+    return config;
+  }
+
   private EventSchema makeExistingSchema() {
     var timestamp = new EventPropertyPrimitive();
     timestamp.setRuntimeName("timestamp");
diff --git 
a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java
 
b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java
new file mode 100644
index 0000000000..ebeee70cfb
--- /dev/null
+++ 
b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/DataLakeDataWriterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl.datalake;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DataLakeDataWriterTest {
+
+  @Test
+  void shouldSkipNullValuesWhenBuildingEventMap() {
+    var headers = List.of("timestamp", "temperature", "status");
+    List<Object> row = new java.util.ArrayList<>();
+    row.add(1710000000000L);
+    row.add(null);
+    row.add("ok");
+
+    var eventMap = DataLakeDataWriter.toEventMap(row, headers);
+
+    assertEquals(2, eventMap.size());
+    assertEquals(1710000000000L, eventMap.get("timestamp"));
+    assertEquals("ok", eventMap.get("status"));
+    assertFalse(eventMap.containsKey("temperature"));
+  }
+
+  @Test
+  void shouldAllowMissingFieldsWhenConfigured() {
+    var expected = new HashSet<>(List.of("timestamp", "temperature", 
"status"));
+    var actual = new HashSet<>(List.of("timestamp", "status"));
+
+    assertTrue(DataLakeDataWriter.matchesRuntimeNames(expected, actual, true));
+    assertFalse(DataLakeDataWriter.matchesRuntimeNames(expected, actual, 
false));
+  }
+}
diff --git 
a/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv 
b/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv
new file mode 100644
index 0000000000..8c3c0cce83
--- /dev/null
+++ 
b/ui/cypress/fixtures/datalake/machine-data-simulator-import-missing-values.csv
@@ -0,0 +1,8 @@
+timestamp,dmc,druckoelraumAnlagenkompensation,druckoelraumBauteiltemperatur1,druckoelraumDifferenztemperatur,druckoelraumDrehzahl,druckoelraumDruck
+1772722801725,0.0,0.0,,0.0,0.0,0.0
+1772722802743,0,0.0,0.0,0.0,0.0,0.0
+1772722803756,0.0,,0.0,0.0,0.0,0.0
+1772722804768,0.0,0.0,,0.0,0.0,0.0
+1772722805777,0.0,0.0,,,0.0,0.0
+1772722806784,0.0,0.0,0.0,0.0,,0.0
+1772722807791,0.0,0.0,0.0,0.0,0.0,

Reply via email to