Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/23#discussion_r87836931
--- Diff: libminifi/include/Provenance.h ---
@@ -0,0 +1,902 @@
+/**
+ * @file Provenance.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROVENANCE_H__
+#define __PROVENANCE_H__
+
+#include <stdio.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <set>
+#include <cassert>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <ftw.h>
+#include "leveldb/db.h"
+
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+#include "ResourceClaim.h"
+#include "Relationship.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+
+// Provenance Event Record Serialization Seg Size
+#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
+
+class ProvenanceRepository;
+
+//! Provenance Event Record
+class ProvenanceEventRecord
+{
+public:
+ enum ProvenanceEventType {
+
+ /**
+ * A CREATE event is used when a FlowFile is generated from data
that was
+ * not received from a remote system or external process
+ */
+ CREATE,
+
+ /**
+ * Indicates a provenance event for receiving data from an external
process. This Event Type
+ * is expected to be the first event for a FlowFile. As such, a
Processor that receives data
+ * from an external source and uses that data to replace the
content of an existing FlowFile
+ * should use the {@link #FETCH} event type, rather than the
RECEIVE event type.
+ */
+ RECEIVE,
+
+ /**
+ * Indicates that the contents of a FlowFile were overwritten using
the contents of some
+ * external resource. This is similar to the {@link #RECEIVE} event
but varies in that
+ * RECEIVE events are intended to be used as the event that
introduces the FlowFile into
+ * the system, whereas FETCH is used to indicate that the contents
of an existing FlowFile
+ * were overwritten.
+ */
+ FETCH,
+
+ /**
+ * Indicates a provenance event for sending data to an external
process
+ */
+ SEND,
+
+ /**
+ * Indicates that the contents of a FlowFile were downloaded by a
user or external entity.
+ */
+ DOWNLOAD,
+
+ /**
+ * Indicates a provenance event for the conclusion of an object's
life for
+ * some reason other than object expiration
+ */
+ DROP,
+
+ /**
+ * Indicates a provenance event for the conclusion of an object's
life due
+ * to the fact that the object could not be processed in a timely
manner
+ */
+ EXPIRE,
+
+ /**
+ * FORK is used to indicate that one or more FlowFile was derived
from a
+ * parent FlowFile.
+ */
+ FORK,
+
+ /**
+ * JOIN is used to indicate that a single FlowFile is derived from
joining
+ * together multiple parent FlowFiles.
+ */
+ JOIN,
+
+ /**
+ * CLONE is used to indicate that a FlowFile is an exact duplicate
of its
+ * parent FlowFile.
+ */
+ CLONE,
+
+ /**
+ * CONTENT_MODIFIED is used to indicate that a FlowFile's content
was
+ * modified in some way. When using this Event Type, it is
advisable to
+ * provide details about how the content is modified.
+ */
+ CONTENT_MODIFIED,
+
+ /**
+ * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's
attributes were
+ * modified in some way. This event is not needed when another
event is
+ * reported at the same time, as the other event will already
contain all
+ * FlowFile attributes.
+ */
+ ATTRIBUTES_MODIFIED,
+
+ /**
+ * ROUTE is used to show that a FlowFile was routed to a specified
+ * {@link org.apache.nifi.processor.Relationship Relationship} and
should provide
+ * information about why the FlowFile was routed to this
relationship.
+ */
+ ROUTE,
+
+ /**
+ * Indicates a provenance event for adding additional information
such as a
+ * new linkage to a new URI or UUID
+ */
+ ADDINFO,
+
+ /**
+ * Indicates a provenance event for replaying a FlowFile. The UUID
of the
+ * event will indicate the UUID of the original FlowFile that is
being
+ * replayed. The event will contain exactly one Parent UUID that is
also the
+ * UUID of the FlowFile that is being replayed and exactly one
Child UUID
+ * that is the UUID of the a newly created FlowFile that will be
re-queued
+ * for processing.
+ */
+ REPLAY
+ };
+ friend class ProcessSession;
+public:
+ //! Constructor
+ /*!
+ * Create a new provenance event record
+ */
+ ProvenanceEventRecord(ProvenanceEventType event, std::string
componentId, std::string componentType) {
+ _eventType = event;
+ _componentId = componentId;
+ _componentType = componentType;
+ _eventTime = getTimeMillis();
+ char eventIdStr[37];
+ // Generate the global UUID for th event
+ uuid_generate(_eventId);
+ uuid_unparse(_eventId, eventIdStr);
+ _eventIdStr = eventIdStr;
+ _serializedBuf = NULL;
+ _serializeBufSize = 0;
+ _maxSerializeBufSize = 0;
+ _logger = Logger::getLogger();
+ }
+
+ ProvenanceEventRecord() {
+ _eventTime = getTimeMillis();
+ _serializedBuf = NULL;
+ _serializeBufSize = 0;
+ _maxSerializeBufSize = 0;
+ _logger = Logger::getLogger();
+ }
+
+ //! Destructor
+ virtual ~ProvenanceEventRecord() {
+ }
+ //! Get the Event ID
+ std::string getEventId() {
+ return _eventIdStr;
+ }
+ //! Get Attributes
+ std::map<std::string, std::string> getAttributes() {
+ return _attributes;
+ }
+ //! Get Size
+ uint64_t getFileSize() {
+ return _size;
+ }
+ // ! Get Offset
+ uint64_t getFileOffset() {
+ return _offset;
+ }
+ // ! Get Entry Date
+ uint64_t getFlowFileEntryDate() {
+ return _entryDate;
+ }
+ // ! Get Lineage Start Date
+ uint64_t getlineageStartDate() {
+ return _lineageStartDate;
+ }
+ // ! Get Event Time
+ uint64_t getEventTime() {
+ return _eventTime;
+ }
+ // ! Get Event Duration
+ uint64_t getEventDuration() {
+ return _eventDuration;
+ }
+ //! Set Event Duration
+ void setEventDuration(uint64_t duration)
+ {
+ _eventDuration = duration;
+ }
+ // ! Get Event Type
+ ProvenanceEventType getEventType() {
+ return _eventType;
+ }
+ //! Get Component ID
+ std::string getComponentId()
+ {
+ return _componentId;
+ }
+ //! Get Component Type
+ std::string getComponentType()
+ {
+ return _componentType;
+ }
+ //! Get FlowFileUuid
+ std::string getFlowFileUuid()
+ {
+ return _uuid;
+ }
+ //! Get content full path
+ std::string getContentFullPath()
+ {
+ return _contentFullPath;
+ }
+ //! Get LineageIdentifiers
+ std::set<std::string> getLineageIdentifiers()
+ {
+ return _lineageIdentifiers;
+ }
+ //! Get Details
+ std::string getDetails()
+ {
+ return _details;
+ }
+ //! Set Details
+ void setDetails(std::string details)
+ {
+ _details = details;
+ }
+ //! Get TransitUri
+ std::string getTransitUri()
+ {
+ return _transitUri;
+ }
+ //! Set TransitUri
+ void setTransitUri(std::string uri)
+ {
+ _transitUri = uri;
+ }
+ //! Get SourceSystemFlowFileIdentifier
+ std::string getSourceSystemFlowFileIdentifier()
+ {
+ return _sourceSystemFlowFileIdentifier;
+ }
+ //! Set SourceSystemFlowFileIdentifier
+ void setSourceSystemFlowFileIdentifier(std::string identifier)
+ {
+ _sourceSystemFlowFileIdentifier = identifier;
+ }
+ //! Get Parent UUIDs
+ std::vector<std::string> getParentUuids()
+ {
+ return _parentUuids;
+ }
+ //! Add Parent UUID
+ void addParentUuid(std::string uuid)
+ {
+ if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid)
!= _parentUuids.end())
+ return;
+ else
+ _parentUuids.push_back(uuid);
+ }
+ //! Add Parent Flow File
+ void addParentFlowFile(FlowFileRecord *flow)
+ {
+ addParentUuid(flow->getUUIDStr());
+ return;
+ }
+ //! Remove Parent UUID
+ void removeParentUuid(std::string uuid)
+ {
+ _parentUuids.erase(std::remove(_parentUuids.begin(),
_parentUuids.end(), uuid), _parentUuids.end());
+ }
+ //! Remove Parent Flow File
+ void removeParentFlowFile(FlowFileRecord *flow)
+ {
+ removeParentUuid(flow->getUUIDStr());
+ return;
+ }
+ //! Get Children UUIDs
+ std::vector<std::string> getChildrenUuids()
+ {
+ return _childrenUuids;
+ }
+ //! Add Child UUID
+ void addChildUuid(std::string uuid)
+ {
+ if (std::find(_childrenUuids.begin(), _childrenUuids.end(),
uuid) != _childrenUuids.end())
+ return;
+ else
+ _childrenUuids.push_back(uuid);
+ }
+ //! Add Child Flow File
+ void addChildFlowFile(FlowFileRecord *flow)
+ {
+ addChildUuid(flow->getUUIDStr());
+ return;
+ }
+ //! Remove Child UUID
+ void removeChildUuid(std::string uuid)
+ {
+ _childrenUuids.erase(std::remove(_childrenUuids.begin(),
_childrenUuids.end(), uuid), _childrenUuids.end());
+ }
+ //! Remove Child Flow File
+ void removeChildFlowFile(FlowFileRecord *flow)
+ {
+ removeChildUuid(flow->getUUIDStr());
+ return;
+ }
+ //! Get AlternateIdentifierUri
+ std::string getAlternateIdentifierUri()
+ {
+ return _alternateIdentifierUri;
+ }
+ //! Set AlternateIdentifierUri
+ void setAlternateIdentifierUri(std::string uri)
+ {
+ _alternateIdentifierUri = uri;
+ }
+ //! Get Relationship
+ std::string getRelationship()
+ {
+ return _relationship;
+ }
+ //! Set Relationship
+ void setRelationship(std::string relation)
+ {
+ _relationship = relation;
+ }
+ //! Get sourceQueueIdentifier
+ std::string getSourceQueueIdentifier()
+ {
+ return _sourceQueueIdentifier;
+ }
+ //! Set sourceQueueIdentifier
+ void setSourceQueueIdentifier(std::string identifier)
+ {
+ _sourceQueueIdentifier = identifier;
+ }
+ //! fromFlowFile
+ void fromFlowFile(FlowFileRecord *flow)
+ {
+ _entryDate = flow->getEntryDate();
+ _lineageStartDate = flow->getlineageStartDate();
+ _lineageIdentifiers = flow->getlineageIdentifiers();
+ _uuid = flow->getUUIDStr();
+ _attributes = flow->getAttributes();
+ _size = flow->getSize();
+ _offset = flow->getOffset();
+ if (flow->getOriginalConnection())
+ _sourceQueueIdentifier =
flow->getOriginalConnection()->getName();
+ if (flow->getResourceClaim())
+ {
+ _contentFullPath =
flow->getResourceClaim()->getContentFullPath();
+ }
+ }
+ //! Serialize and Persistent to the repository
+ bool Serialize(ProvenanceRepository *repo);
+ //! DeSerialize
+ bool DeSerialize(uint8_t *buffer, int bufferSize);
+ //! DeSerialize
+ bool DeSerialize(ProvenanceRepository *repo, std::string key);
+
+protected:
+
+ //! Event type
+ ProvenanceEventType _eventType;
+ //! Date at which the event was created
+ uint64_t _eventTime;
+ //! Date at which the flow file entered the flow
+ uint64_t _entryDate;
+ //! Date at which the origin of this flow file entered the flow
+ uint64_t _lineageStartDate;
+ //! Event Duration
+ uint64_t _eventDuration;
+ //! Component ID
+ std::string _componentId;
+ //! Component Type
+ std::string _componentType;
+ //! Size in bytes of the data corresponding to this flow file
+ uint64_t _size;
+ //! flow uuid
+ std::string _uuid;
+ //! Offset to the content
+ uint64_t _offset;
+ //! Full path to the content
+ std::string _contentFullPath;
+ //! Attributes key/values pairs for the flow record
+ std::map<std::string, std::string> _attributes;
+ //! provenance ID
+ uuid_t _eventId;
+ //! UUID string for all parents
+ std::set<std::string> _lineageIdentifiers;
+ //! transitUri
+ std::string _transitUri;
+ //! sourceSystemFlowFileIdentifier
+ std::string _sourceSystemFlowFileIdentifier;
+ //! parent UUID
+ std::vector<std::string> _parentUuids;
+ //! child UUID
+ std::vector<std::string> _childrenUuids;
+ //! detail
+ std::string _details;
+ //! sourceQueueIdentifier
+ std::string _sourceQueueIdentifier;
+ //! event ID Str
+ std::string _eventIdStr;
+ //! relationship
+ std::string _relationship;
+ //! alternateIdentifierUri;
+ std::string _alternateIdentifierUri;
+
+private:
+
+ //! Logger
+ Logger *_logger;
+ // All serialization related method and internal buf
+ uint8_t *_serializedBuf;
+ int _serializeBufSize;
+ int _maxSerializeBufSize;
+ int writeData(uint8_t *value, int size)
+ {
+ if ((_serializeBufSize + size) > _maxSerializeBufSize)
+ {
+ // if write exceed
+ uint8_t *buffer = new uint8_t[_maxSerializeBufSize +
PROVENANCE_EVENT_RECORD_SEG_SIZE];
+ if (!buffer)
+ {
+ return -1;
+ }
+ memcpy(buffer, _serializedBuf, _serializeBufSize);
+ delete[] _serializedBuf;
+ _serializedBuf = buffer;
+ _maxSerializeBufSize = _maxSerializeBufSize +
PROVENANCE_EVENT_RECORD_SEG_SIZE;
+ }
+ uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
+ memcpy(bufPtr, value, size);
+ _serializeBufSize += size;
+ return size;
+ }
+ int readData(uint8_t *buf, int buflen)
+ {
+ if ((buflen + _serializeBufSize) > _maxSerializeBufSize)
+ {
+ // if read exceed
+ return -1;
+ }
+ uint8_t *bufPtr = _serializedBuf + _serializeBufSize;
+ memcpy(buf, bufPtr, buflen);
+ _serializeBufSize += buflen;
+ return buflen;
+ }
+ int write(uint8_t value)
+ {
+ return writeData(&value, 1);
+ }
+ int write(char value)
+ {
+ return writeData((uint8_t *)&value, 1);
+ }
+ int write(uint32_t value)
+ {
+ uint8_t temp[4];
+
+ temp[0] = (value & 0xFF000000) >> 24;
+ temp[1] = (value & 0x00FF0000) >> 16;
+ temp[2] = (value & 0x0000FF00) >> 8;
+ temp[3] = (value & 0x000000FF);
+ return writeData(temp, 4);
+ }
+ int write(uint16_t value)
+ {
+ uint8_t temp[2];
+ temp[0] = (value & 0xFF00) >> 8;
+ temp[1] = (value & 0xFF);
+ return writeData(temp, 2);
+ }
+ int write(uint8_t *value, int len)
+ {
+ return writeData(value, len);
+ }
+ int write(uint64_t value)
+ {
+ uint8_t temp[8];
+
+ temp[0] = (value >> 56) & 0xFF;
+ temp[1] = (value >> 48) & 0xFF;
+ temp[2] = (value >> 40) & 0xFF;
+ temp[3] = (value >> 32) & 0xFF;
+ temp[4] = (value >> 24) & 0xFF;
+ temp[5] = (value >> 16) & 0xFF;
+ temp[6] = (value >> 8) & 0xFF;
+ temp[7] = (value >> 0) & 0xFF;
+ return writeData(temp, 8);
+ }
+ int write(bool value)
+ {
+ uint8_t temp = value;
+ return write(temp);
+ }
+ int writeUTF(std::string str, bool widen = false);
+ int read(uint8_t &value)
+ {
+ uint8_t buf;
+
+ int ret = readData(&buf, 1);
+ if (ret == 1)
+ value = buf;
+ return ret;
+ }
+ int read(uint16_t &value)
+ {
+ uint8_t buf[2];
+
+ int ret = readData(buf, 2);
+ if (ret == 2)
+ value = (buf[0] << 8) | buf[1];
+ return ret;
+ }
+ int read(char &value)
+ {
+ uint8_t buf;
+
+ int ret = readData(&buf, 1);
+ if (ret == 1)
+ value = (char) buf;
+ return ret;
+ }
+ int read(uint8_t *value, int len)
+ {
+ return readData(value, len);
+ }
+ int read(uint32_t &value)
+ {
+ uint8_t buf[4];
+
+ int ret = readData(buf, 4);
+ if (ret == 4)
+ value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8)
| buf[3];
+ return ret;
+ }
+ int read(uint64_t &value)
+ {
+ uint8_t buf[8];
+
+ int ret = readData(buf, 8);
+ if (ret == 8)
+ {
+ value = ((uint64_t) buf[0] << 56) |
+ ((uint64_t) (buf[1] & 255) << 48) |
+ ((uint64_t) (buf[2] & 255) << 40) |
+ ((uint64_t) (buf[3] & 255) << 32) |
+ ((uint64_t) (buf[4] & 255) << 24) |
+ ((uint64_t) (buf[5] & 255) << 16) |
+ ((uint64_t) (buf[6] & 255) << 8) |
+ ((uint64_t) (buf[7] & 255) << 0);
+ }
+ return ret;
+ }
+ int readUTF(std::string &str, bool widen = false);
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ProvenanceEventRecord(const ProvenanceEventRecord &parent);
+ ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent);
+
+};
+
+//! Provenance Reporter
+class ProvenanceReporter
+{
+ friend class ProcessSession;
+public:
+ //! Constructor
+ /*!
+ * Create a new provenance reporter associated with the process session
+ */
+ ProvenanceReporter(std::string componentId, std::string componentType) {
+ _logger = Logger::getLogger();
+ _componentId = componentId;
+ _componentType = componentType;
+ }
+
+ //! Destructor
+ virtual ~ProvenanceReporter() {
+ clear();
+ }
+ //! Get events
+ std::set<ProvenanceEventRecord *> getEvents()
+ {
+ return _events;
+ }
+ //! Add event
+ void add(ProvenanceEventRecord *event)
+ {
+ _events.insert(event);
+ }
+ //! Remove event
+ void remove(ProvenanceEventRecord *event)
+ {
+ if (_events.find(event) != _events.end())
+ {
+ _events.erase(event);
+ }
+ }
+ //!
+ //! clear
+ void clear()
+ {
+ for (std::set<ProvenanceEventRecord*>::iterator it =
_events.begin(); it != _events.end(); ++it)
+ {
+ ProvenanceEventRecord *event = (ProvenanceEventRecord
*) (*it);
+ delete event;
+ }
+ _events.clear();
+ }
+ //! allocate
+ ProvenanceEventRecord
*allocate(ProvenanceEventRecord::ProvenanceEventType eventType, FlowFileRecord
*flow)
+ {
+ ProvenanceEventRecord *event = new
ProvenanceEventRecord(eventType, _componentId, _componentType);
+ if (event)
+ event->fromFlowFile(flow);
+
+ return event;
+ }
+ //! commit
+ void commit();
+ //! create
+ void create(FlowFileRecord *flow, std::string detail);
+ //! route
+ void route(FlowFileRecord *flow, Relationship relation, std::string
detail, uint64_t processingDuration);
+ //! modifyAttributes
+ void modifyAttributes(FlowFileRecord *flow, std::string detail);
+ //! modifyContent
+ void modifyContent(FlowFileRecord *flow, std::string detail, uint64_t
processingDuration);
+ //! clone
+ void clone(FlowFileRecord *parent, FlowFileRecord *child);
+ //! join
+ void join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child,
std::string detail, uint64_t processingDuration);
+ //! fork
+ void fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent,
std::string detail, uint64_t processingDuration);
+ //! expire
+ void expire(FlowFileRecord *flow, std::string detail);
+ //! drop
+ void drop(FlowFileRecord *flow, std::string reason);
+ //! send
+ void send(FlowFileRecord *flow, std::string transitUri, std::string
detail, uint64_t processingDuration, bool force);
+ //! fetch
+ void fetch(FlowFileRecord *flow, std::string transitUri, std::string
detail, uint64_t processingDuration);
+ //! receive
+ void receive(FlowFileRecord *flow, std::string transitUri, std::string
sourceSystemFlowFileIdentifier, std::string detail, uint64_t
processingDuration);
+
+protected:
+
+ //! Component ID
+ std::string _componentId;
+ //! Component Type
+ std::string _componentType;
+
+private:
+
+ //! Incoming connection Iterator
+ std::set<ProvenanceEventRecord *> _events;
+ //! Logger
+ Logger *_logger;
+
+ // Prevent default copy constructor and assignment operation
+ // Only support pass by reference or pointer
+ ProvenanceReporter(const ProvenanceReporter &parent);
+ ProvenanceReporter &operator=(const ProvenanceReporter &parent);
+};
+
+#define PROVENANCE_DIRECTORY "./provenance_repository"
+#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
+#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
+
+//! Provenance Repository
+class ProvenanceRepository
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new provenance repository
+ */
+ ProvenanceRepository() {
+ _logger = Logger::getLogger();
+ _configure = Configure::getConfigure();
+ _directory = PROVENANCE_DIRECTORY;
+ _maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME;
+ _purgePeriod = PROVENANCE_PURGE_PERIOD;
+ _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE;
+ _db = NULL;
+ _running = false;
+ _repoFull = false;
+ }
+
+ //! Destructor
+ virtual ~ProvenanceRepository() {
+ stop();
+ if (this->_thread)
+ delete this->_thread;
+ destory();
+ }
+
+ //! initialize
+ bool initialize()
+ {
+ std::string value;
+ if
(_configure->get(Configure::nifi_provenance_repository_directory_default,
value))
+ {
+ _directory = value;
+ }
+ _logger->log_info("NiFi Provenance Repository Directory %s",
_directory.c_str());
+ if
(_configure->get(Configure::nifi_provenance_repository_max_storage_size, value))
+ {
+ Property::StringToInt(value, _maxPartitionBytes);
+ }
+ _logger->log_info("NiFi Provenance Max Partition Bytes %d",
_maxPartitionBytes);
+ if
(_configure->get(Configure::nifi_provenance_repository_max_storage_time, value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _maxPartitionMillis,
unit) &&
+
Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
+ {
+ }
+ }
+ _logger->log_info("NiFi Provenance Max Storage Time: [%d] ms",
_maxPartitionMillis);
+ leveldb::Options options;
+ options.create_if_missing = true;
+ leveldb::Status status = leveldb::DB::Open(options,
_directory.c_str(), &_db);
+ if (status.ok())
+ {
+ _logger->log_info("NiFi Provenance Repository database
open %s success", _directory.c_str());
+ }
+ else
+ {
+ _logger->log_error("NiFi Provenance Repository database
open %s fail", _directory.c_str());
+ return false;
+ }
+
+ // start the monitor thread
+ start();
+ return true;
+ }
+ //! Put
+ bool Put(std::string key, uint8_t *buf, int bufLen)
+ {
+ // persistent to the DB
+ leveldb::Slice value((const char *) buf, bufLen);
+ leveldb::Status status;
+ status = _db->Put(leveldb::WriteOptions(), key, value);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Delete
+ bool Delete(std::string key)
+ {
+ leveldb::Status status;
+ status = _db->Delete(leveldb::WriteOptions(), key);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Get
+ bool Get(std::string key, std::string &value)
+ {
+ leveldb::Status status;
+ status = _db->Get(leveldb::ReadOptions(), key, &value);
+ if (status.ok())
+ return true;
+ else
+ return false;
+ }
+ //! Persistent event
+ void registerEvent(ProvenanceEventRecord *event)
+ {
+ event->Serialize(this);
+ }
+ //! Remove event
+ void removeEvent(ProvenanceEventRecord *event)
+ {
+ Delete(event->getEventId());
+ }
+ //! destory
+ void destory()
--- End diff --
should adjust this to destroy
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---