This is an automated email from the ASF dual-hosted git repository. rkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-sdap-in-situ-data-services.git
commit 3ed32ba0579b09076176648dbb575afc2483da0f Author: rileykk <[email protected]> AuthorDate: Mon Feb 5 13:34:07 2024 -0800 Handle ingesting previously ingested files - Check if the file has been ingested before - If so, query the stats index for all the parquet objects in S3 from the previous ingest job - After successful ingest, delete the old objects - Stats indexing lambda will clean up the stats index entries --- parquet_flask/aws/aws_s3.py | 19 +++++++++ parquet_flask/v1/ingest_aws_json.py | 78 ++++++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/parquet_flask/aws/aws_s3.py b/parquet_flask/aws/aws_s3.py index e9bcd49..840625c 100644 --- a/parquet_flask/aws/aws_s3.py +++ b/parquet_flask/aws/aws_s3.py @@ -140,3 +140,22 @@ class AwsS3(AwsCred): self.__s3_client.download_file(self.__target_bucket, self.__target_key, local_file_path) LOGGER.debug(f'file downloaded') return local_file_path + + def delete_keys(self, bucket, keys): + batches = [keys[i:i + 1000] for i in range(0, len(keys), 1000)] + + for batch in batches: + LOGGER.debug(f'Deleting {len(batch):,} objects') + resp = self.__s3_client.delete_objects(Bucket=bucket, Delete=dict(Objects=batch)) + + if len(resp['Deleted']) != len(batch): + LOGGER.error(f'{len(resp["Errors"]):,} objects could not be deleted') + + retries = 3 + + while len(resp["Errors"]) > 0 and retries > 0: + LOGGER.debug(f'Retrying {len(resp["Errors"])} objects') + resp = self.__s3_client.delete_objects( + Bucket=bucket, + Delete=dict(Objects=[dict(Key=e['Key']) for e in resp['Errors']]) + ) diff --git a/parquet_flask/v1/ingest_aws_json.py b/parquet_flask/v1/ingest_aws_json.py index ae331c3..833220f 100644 --- a/parquet_flask/v1/ingest_aws_json.py +++ b/parquet_flask/v1/ingest_aws_json.py @@ -39,7 +39,7 @@ class IngestAwsJsonProps: self.__s3_sha_url = None self.__uuid = str(uuid.uuid4()) self.__working_dir = f'/tmp/{str(uuid.uuid4())}' - self.__is_replacing = False + self.__is_replacing = True self.__is_sanitizing = True self.__wait_till_complete = True @@ -146,6 +146,8 @@ class IngestAwsJson: config = Config() es_url = config.get_value(Config.es_url) es_port = int(config.get_value(Config.es_port, '443')) + self.__es_url = es_url + self.__es_port = es_port self.__es: ESAbstract = ESFactory().get_instance('AWS', index='', base_url=es_url, port=es_port) self.__db_io: MetadataTblInterface = MetadataTblES(self.__es) @@ -219,9 +221,9 @@ class IngestAwsJson: FileUtils.del_file(self.__saved_file_name) return {'message': 'failed to ingest to parquet', 'details': str(e)}, 500 if self.__sha512_result is True: - return {'message': 'ingested', 'job_id': self.__props.uuid}, 201 + return {'message': 'ingested', 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 201 return {'message': 'ingested, different sha512', 'cause': self.__sha512_cause, - 'job_id': self.__props.uuid}, 203 + 'job_id': self.__props.uuid, 'reingest': self.__props.is_replacing}, 203 def ingest(self): """ @@ -237,16 +239,49 @@ class IngestAwsJson: try: LOGGER.debug(f'starting to ingest: {self.__props.s3_url}') existing_record = self.__db_io.get_by_s3_url(self.__props.s3_url) - if existing_record is None and self.__props.is_replacing is True: - LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}') - return {'message': 'unable to replace file as it is new'}, 500 - if existing_record is not None and self.__props.is_replacing is False: - LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ingested record: {existing_record}') + LOGGER.error(f'unable to ingest file as it is already ingested. {self.__props.s3_url}. ' + f'ingested record: {existing_record}') return {'message': 'unable to ingest file as it is already ingested'}, 500 + if existing_record is None and self.__props.is_replacing is True: + # LOGGER.error(f'unable to replace file as it is new. {self.__props.s3_url}') + # return {'message': 'unable to replace file as it is new'}, 500 + LOGGER.info(f'File {self.__props.s3_url} is new and does not need to be replaced') + self.__props.is_replacing = False + s3 = AwsS3().set_s3_url(self.__props.s3_url) LOGGER.debug(f'downloading s3 file: {self.__props.uuid}') + LOGGER.debug(f'Existing record: {existing_record}') + + if existing_record is not None: + ingest_uuid_to_replace = existing_record['uuid'] + + stats_es: ESAbstract = ESFactory().get_instance('AWS', index='parquet_stats_alias', + base_url=self.__es_url, port=self.__es_port) + + parquet_stats = stats_es.query_with_scroll( + { + "query": { + "wildcard": { + "s3_url": { + "value": f"*/job_id={ingest_uuid_to_replace}/*" + } + } + } + }, + querying_index='parquet_stats_alias' + ) + + parquet_files = [s3.split_s3_url(doc['_id']) for doc in parquet_stats['hits']['hits']] + + if len(parquet_files) != 0: + LOGGER.warning(f'Could find no S3 objects to delete for ingest job [{ingest_uuid_to_replace}] that ' + f'needs to be replaced!') + + else: + parquet_files = [] + FileUtils.mk_dir_p(self.__props.working_dir) self.__saved_file_name = s3.download(self.__props.working_dir) self.__file_sha512 = FileUtils.get_checksum(self.__saved_file_name) @@ -255,9 +290,32 @@ class IngestAwsJson: self.__saved_file_name = FileUtils.gunzip_file_os(self.__saved_file_name) self.__compare_sha512(self.__get_s3_sha512()) if self.__props.wait_till_complete is True: - return self.__execute_ingest_data() + ret = self.__execute_ingest_data() + + if existing_record is None or len(parquet_files) == 0: + return ret + else: + LOGGER.info(f'Deleting {len(parquet_files)} parquet objects from existing ingest') + s3.delete_keys( + parquet_files[0][0], + [dict(Key=p[1]) for p in parquet_files] + ) + return ret else: - bg_process = Process(target=self.__execute_ingest_data, args=()) + if existing_record is None or len(parquet_files) == 0: + bg_target = self.__execute_ingest_data + args = () + else: + def ingest_then_delete(p_s3, p_parquet_files): + self.__execute_ingest_data() + p_s3.delete_keys( + p_parquet_files[0][0], + [dict(Key=p[1]) for p in p_parquet_files] + ) + bg_target = ingest_then_delete + args = (s3, parquet_files) + + bg_process = Process(target=bg_target, args=args) bg_process.daemon = True bg_process.start() return {'message': 'ingesting. Not waiting.', 'job_id': self.__props.uuid}, 204
